From c0f3d38ad92a364e360c281f8889daa65cdef861 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 13 Feb 2024 19:57:43 +0100 Subject: [PATCH] Speed up request buffered hashes (#6318) Co-authored-by: Bjerg Co-authored-by: back <104515688+justcode740@users.noreply.github.com> Co-authored-by: Matthias Seitz --- crates/net/eth-wire/src/types/broadcast.rs | 166 ++- crates/net/network/src/cache.rs | 2 +- crates/net/network/src/metrics.rs | 20 +- crates/net/network/src/peers/manager.rs | 10 +- crates/net/network/src/peers/mod.rs | 6 +- crates/net/network/src/session/config.rs | 6 +- .../net/network/src/transactions/constants.rs | 201 +++ .../net/network/src/transactions/fetcher.rs | 1077 +++++++++++------ crates/net/network/src/transactions/mod.rs | 630 +++++----- crates/transaction-pool/src/lib.rs | 2 +- crates/transaction-pool/src/noop.rs | 3 +- crates/transaction-pool/src/pool/mod.rs | 8 +- crates/transaction-pool/src/traits.rs | 4 +- 13 files changed, 1306 insertions(+), 829 deletions(-) create mode 100644 crates/net/network/src/transactions/constants.rs diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index b7b95d0441e8..ab2cc5e481c4 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -4,7 +4,8 @@ use crate::{EthMessage, EthVersion}; use alloy_rlp::{ Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper, }; -use derive_more::{Deref, DerefMut, IntoIterator}; + +use derive_more::{Constructor, Deref, DerefMut, IntoIterator}; use reth_codecs::derive_arbitrary; use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128}; @@ -451,10 +452,12 @@ pub trait HandleAnnouncement { /// Returns the number of entries. fn len(&self) -> usize; - /// Retain only entries for which the hash in the entry, satisfies a given predicate. - fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool); + /// Retain only entries for which the hash in the entry satisfies a given predicate, return + /// the rest. + fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self; - /// Returns the announcement version, either [`EthVersion::Eth66`] or [`EthVersion::Eth68`]. + /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or + /// [`Eth68`](EthVersion::Eth68). fn msg_version(&self) -> EthVersion; } @@ -467,10 +470,10 @@ impl HandleAnnouncement for NewPooledTransactionHashes { self.len() } - fn retain_by_hash(&mut self, f: impl FnMut(TxHash) -> bool) { + fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self { match self { - NewPooledTransactionHashes::Eth66(msg) => msg.retain_by_hash(f), - NewPooledTransactionHashes::Eth68(msg) => msg.retain_by_hash(f), + NewPooledTransactionHashes::Eth66(msg) => Self::Eth66(msg.retain_by_hash(f)), + NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)), } } @@ -488,19 +491,28 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 { self.hashes.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { let mut indices_to_remove = vec![]; - for (i, &hash) in self.hashes.iter().enumerate() { + for (i, hash) in self.hashes.iter().enumerate() { if !f(hash) { indices_to_remove.push(i); } } + let mut removed_hashes = Vec::with_capacity(indices_to_remove.len()); + let mut removed_types = Vec::with_capacity(indices_to_remove.len()); + let mut removed_sizes = Vec::with_capacity(indices_to_remove.len()); + for index in indices_to_remove.into_iter().rev() { - self.hashes.remove(index); - self.types.remove(index); - self.sizes.remove(index); + let hash = self.hashes.remove(index); + removed_hashes.push(hash); + let ty = self.types.remove(index); + removed_types.push(ty); + let size = self.sizes.remove(index); + removed_sizes.push(size); } + + Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes } } fn msg_version(&self) -> EthVersion { @@ -517,17 +529,22 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 { self.0.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { let mut indices_to_remove = vec![]; - for (i, &hash) in self.0.iter().enumerate() { + for (i, hash) in self.0.iter().enumerate() { if !f(hash) { indices_to_remove.push(i); } } + let mut removed_hashes = Vec::with_capacity(indices_to_remove.len()); + for index in indices_to_remove.into_iter().rev() { - self.0.remove(index); + let hash = self.0.remove(index); + removed_hashes.push(hash); } + + Self(removed_hashes) } fn msg_version(&self) -> EthVersion { @@ -538,42 +555,53 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 { /// Announcement data that has been validated according to the configured network. For an eth68 /// announcement, values of the map are `Some((u8, usize))` - the tx metadata. For an eth66 /// announcement, values of the map are `None`. -#[derive(Debug, IntoIterator)] +#[derive(Debug, Deref, DerefMut, IntoIterator, Constructor)] pub struct ValidAnnouncementData { + #[deref] + #[deref_mut] #[into_iterator] data: HashMap>, version: EthVersion, } impl ValidAnnouncementData { - /// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`] - /// announcement data. + /// Returns a new [`ValidAnnouncementData`] wrapper around validated + /// [`Eth68`](EthVersion::Eth68) announcement data. pub fn new_eth68(data: HashMap>) -> Self { - Self { data, version: EthVersion::Eth68 } + Self::new(data, EthVersion::Eth68) } - /// Returns a new [`ValidAnnouncementData`] wrapper around validated [`EthVersion::Eth68`] - /// announcement data. + /// Returns a new [`ValidAnnouncementData`] wrapper around validated + /// [`Eth68`](EthVersion::Eth68) announcement data. pub fn new_eth66(data: HashMap>) -> Self { - Self { data, version: EthVersion::Eth66 } + Self::new(data, EthVersion::Eth66) } - /// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth68`] + /// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth68`](EthVersion::Eth68) /// announcement. pub fn empty_eth68() -> Self { - Self { data: HashMap::new(), version: EthVersion::Eth68 } + Self::new_eth68(HashMap::new()) } - /// Returns a new [`ValidAnnouncementData`] with empty data for an [`EthVersion::Eth66`] + /// Returns a new [`ValidAnnouncementData`] with empty data from an [`Eth66`](EthVersion::Eth66) /// announcement. pub fn empty_eth66() -> Self { - Self { data: HashMap::new(), version: EthVersion::Eth66 } + Self::new_eth66(HashMap::new()) } /// Destructs returning the validated data. pub fn into_data(self) -> HashMap> { self.data } + + /// Destructs returning only the valid hashes and the announcement message version. Caution! If + /// this is [`Eth68`](EthVersion::Eth68)announcement data, the metadata must be cached + /// before call. + pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) { + let hashes = self.data.into_keys().collect::>(); + + (RequestTxHashes::new(hashes), self.version) + } } impl HandleAnnouncement for ValidAnnouncementData { @@ -585,8 +613,14 @@ impl HandleAnnouncement for ValidAnnouncementData { self.data.len() } - fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { - self.data.retain(|&hash, _| f(hash)) + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { + let data = std::mem::take(&mut self.data); + + let (keep, rest) = data.into_iter().partition(|(hash, _)| f(hash)); + + self.data = keep; + + ValidAnnouncementData::new(rest, self.version) } fn msg_version(&self) -> EthVersion { @@ -594,74 +628,36 @@ impl HandleAnnouncement for ValidAnnouncementData { } } -/// Hashes extracted from valid announcement data. For an eth68 announcement, this means the eth68 -/// metadata should have been cached already. -#[derive(Debug, Deref, DerefMut, IntoIterator)] -pub struct ValidTxHashes { +/// Hashes to request from a peer. +#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)] +pub struct RequestTxHashes { #[deref] #[deref_mut] - #[into_iterator] + #[into_iterator(owned, ref)] hashes: Vec, - version: EthVersion, } -impl ValidTxHashes { - /// Returns a new [`ValidTxHashes`] wrapper around validated hashes. Takes a list of validated - /// hashes as parameter along with the eth version. - pub fn new(hashes: Vec, version: EthVersion) -> Self { - Self { hashes, version } - } - - /// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid - /// [`EthVersion::Eth68`] announcement data. Takes a list of validated hashes as parameter. - pub fn new_eth68(hashes: Vec) -> Self { - Self::new(hashes, EthVersion::Eth68) - } - - /// Returns a new [`ValidTxHashes`] wrapper around validated hashes from valid - /// [`EthVersion::Eth66`] announcement data. Takes a list of validated hashes as parameter. - pub fn new_eth66(hashes: Vec) -> Self { - Self::new(hashes, EthVersion::Eth66) - } - - /// Returns a new [`ValidTxHashes`] with empty hashes. - pub fn empty(version: EthVersion) -> Self { - Self { hashes: vec![], version } - } - - /// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth68`] - /// announcement. - pub fn empty_eth68() -> Self { - Self::empty(EthVersion::Eth68) - } - - /// Returns a new [`ValidTxHashes`] with empty hashes for an [`EthVersion::Eth66`] - /// announcement. - pub fn empty_eth66() -> Self { - Self::empty(EthVersion::Eth66) - } - - /// Destructs returning the validated hashes. - pub fn into_hashes(self) -> Vec { - self.hashes +impl RequestTxHashes { + /// Returns a new [`RequestTxHashes`] with given capacity for hashes. Caution! Make sure to + /// call [`Vec::shrink_to_fit`] on [`RequestTxHashes`] when full, especially where it will be + /// stored in its entirety like in the future waiting for a + /// [`GetPooledTransactions`](crate::GetPooledTransactions) request to resolve. + pub fn with_capacity(capacity: usize) -> Self { + Self::new(Vec::with_capacity(capacity)) } } -impl HandleAnnouncement for ValidTxHashes { - fn is_empty(&self) -> bool { - self.hashes.is_empty() - } +impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes { + fn from_iter)>>(iter: I) -> Self { + let mut hashes = Vec::with_capacity(32); - fn len(&self) -> usize { - self.hashes.len() - } + for (hash, _) in iter { + hashes.push(hash); + } - fn retain_by_hash(&mut self, mut f: impl FnMut(TxHash) -> bool) { - self.hashes.retain(|&hash| f(hash)) - } + hashes.shrink_to_fit(); - fn msg_version(&self) -> EthVersion { - self.version + RequestTxHashes::new(hashes) } } diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 1c8daffaada3..f88594988d03 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -118,7 +118,7 @@ where } /// Wrapper of [`schnellru::LruMap`] that implements [`fmt::Debug`]. -#[derive(Deref, DerefMut)] +#[derive(Deref, DerefMut, Default)] pub struct LruMap(schnellru::LruMap) where K: Hash + PartialEq, diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index a25c84e88fb2..15384b5ca10b 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -53,7 +53,7 @@ pub struct SessionManagerMetrics { pub(crate) total_dial_successes: Counter, } -/// Metrics for the TransactionsManager +/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager). #[derive(Metrics)] #[metrics(scope = "network")] pub struct TransactionsManagerMetrics { @@ -67,10 +67,26 @@ pub struct TransactionsManagerMetrics { pub(crate) messages_with_already_seen_transactions: Counter, /// Number of transactions about to be imported into the pool. pub(crate) pending_pool_imports: Gauge, - /// Currently active outgoing GetPooledTransactions requests. + /// Number of inflight requests at which the + /// [`TransactionPool`](reth_transaction_pool::TransactionPool) is considered to be at + /// capacity. Note, this is not a limit to the number of inflight requests, but a health + /// measure. + pub(crate) capacity_pending_pool_imports: Counter, + /// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) + /// requests. pub(crate) inflight_transaction_requests: Gauge, + /// Number of inflight requests at which the + /// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at + /// capacity. Note, this is not a limit to the number of inflight requests, but a health + /// measure. + pub(crate) capacity_inflight_requests: Counter, + /// Hashes in currently active outgoing + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests. + pub(crate) hashes_inflight_transaction_requests: Gauge, /// How often we failed to send a request to the peer because the channel was full. pub(crate) egress_peer_channel_full: Counter, + /// Total number of hashes pending fetch. + pub(crate) hashes_pending_fetch: Gauge, } /// Metrics for Disconnection types diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index e3ea70d7619e..2a2a51838871 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -2,8 +2,8 @@ use crate::{ error::{BackoffKind, SessionError}, peers::{ reputation::{is_banned_reputation, DEFAULT_REPUTATION}, - ReputationChangeWeights, DEFAULT_MAX_CONCURRENT_DIALS, DEFAULT_MAX_PEERS_INBOUND, - DEFAULT_MAX_PEERS_OUTBOUND, + ReputationChangeWeights, DEFAULT_MAX_COUNT_CONCURRENT_DIALS, + DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND, }, session::{Direction, PendingSessionHandshakeError}, swarm::NetworkConnectionState, @@ -877,9 +877,9 @@ impl Default for ConnectionInfo { ConnectionInfo { num_outbound: 0, num_inbound: 0, - max_outbound: DEFAULT_MAX_PEERS_OUTBOUND, - max_inbound: DEFAULT_MAX_PEERS_INBOUND, - max_concurrent_outbound_dials: DEFAULT_MAX_CONCURRENT_DIALS, + max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize, + max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize, + max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_DIALS, } } } diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs index 84f941cc59da..8d0a86941d56 100644 --- a/crates/net/network/src/peers/mod.rs +++ b/crates/net/network/src/peers/mod.rs @@ -9,10 +9,10 @@ pub use reputation::ReputationChangeWeights; pub use reth_network_api::PeerKind; /// Maximum number of available slots for outbound sessions. -pub(crate) const DEFAULT_MAX_PEERS_OUTBOUND: usize = 100; +pub(crate) const DEFAULT_MAX_COUNT_PEERS_OUTBOUND: u32 = 100; /// Maximum number of available slots for inbound sessions. -pub(crate) const DEFAULT_MAX_PEERS_INBOUND: usize = 30; +pub(crate) const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30; /// Maximum number of available slots concurrent outgoing dials. -pub(crate) const DEFAULT_MAX_CONCURRENT_DIALS: usize = 10; +pub(crate) const DEFAULT_MAX_COUNT_CONCURRENT_DIALS: usize = 10; diff --git a/crates/net/network/src/session/config.rs b/crates/net/network/src/session/config.rs index 038297b2bd8d..334f8bd56b5e 100644 --- a/crates/net/network/src/session/config.rs +++ b/crates/net/network/src/session/config.rs @@ -1,7 +1,7 @@ //! Configuration types for [SessionManager](crate::session::SessionManager). use crate::{ - peers::{DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND}, + peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}, session::{Direction, ExceedsSessionLimit}, }; use std::time::Duration; @@ -52,7 +52,9 @@ impl Default for SessionsConfig { // `poll`. // The default is twice the maximum number of available slots, if all slots are occupied // the buffer will have capacity for 3 messages per session (average). - session_event_buffer: (DEFAULT_MAX_PEERS_OUTBOUND + DEFAULT_MAX_PEERS_INBOUND) * 2, + session_event_buffer: (DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize + + DEFAULT_MAX_COUNT_PEERS_INBOUND as usize) * + 2, limits: Default::default(), initial_internal_request_timeout: INITIAL_REQUEST_TIMEOUT, protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT, diff --git a/crates/net/network/src/transactions/constants.rs b/crates/net/network/src/transactions/constants.rs new file mode 100644 index 000000000000..8693204c65cc --- /dev/null +++ b/crates/net/network/src/transactions/constants.rs @@ -0,0 +1,201 @@ +/* ==================== BROADCAST ==================== */ + +/// Soft limit for the number of hashes in a +/// [`NewPooledTransactionHashes`](reth_eth_wire::NewPooledTransactionHashes) broadcast message. +/// Spec'd at 4096 hashes. +/// +/// +pub const SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE: usize = 4096; + +/// Default soft limit for the byte size of a [`Transactions`](reth_eth_wire::Transactions) +/// broadcast message. Default is 128 KiB. +pub const DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE: usize = 128 * 1024; + +/* ================ REQUEST-RESPONSE ================ */ + +/// Recommended soft limit for the number of hashes in a +/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 256 hashes +/// (8 KiB). +/// +/// +pub const SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST: usize = 256; + +/// Soft limit for the byte size of a [`PooledTransactions`](reth_eth_wire::PooledTransactions) +/// response on assembling a [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) +/// request. Spec'd at 2 MiB. +/// +/// . +pub const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE: usize = 2 * 1024 * 1024; + +pub mod tx_manager { + use super::{ + tx_fetcher::DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS, + SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST, + }; + + /// Default limit for number transactions to keep track of for a single peer, for transactions + /// that the peer's pool and local pool have in common. Default is 10 KiB. + pub const DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL: usize = 10 * 1024; + + /// Default limit for the number of transactions to keep track of for a single peer, for + /// transactions that are in the peer's pool but maybe not in the local pool yet. + pub const DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL: usize = 10 * 1024; + + /// Default maximum pending pool imports to tolerate. Default is + /// [`SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST`], which is spec'd at 256 + /// hashes, multiplied by [`DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS `], which defaults to 130 + /// requests, so 33 280 imports. + pub const DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS: usize = + SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST * + DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize; +} + +pub mod tx_fetcher { + use crate::peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}; + + use super::{ + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, + SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST, + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + }; + + /* ============== SCALARS OF MESSAGES ============== */ + + /// Default soft limit for the byte size of a + /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a + /// [`GetPooledTransactions`](reth_eth_wire::PooledTransactions) request. This defaults to less + /// than the [`SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE`], at 2 MiB, used when + /// assembling a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response. Default + /// is 128 KiB. + pub const DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST: usize = 128 * 1024; + + /* ==================== RETRIES ==================== */ + + /// Default maximum request retires per [`TxHash`](reth_primitives::TxHash). Note, this is reset + /// should the [`TxHash`](reth_primitives::TxHash) re-appear in an announcement after it has + /// been evicted from the hashes pending fetch cache, i.e. the counter is restarted. If this + /// happens, it is likely a very popular transaction, that should and can indeed be fetched + /// hence this behaviour is favourable. Default is 2 retries. + pub const DEFAULT_MAX_RETRIES: u8 = 2; + + /// Default number of alternative peers to keep track of for each transaction pending fetch. At + /// most [`DEFAULT_MAX_RETRIES`], which defaults to 2 peers, can ever be needed per peer. + /// Default is the sum of [`DEFAULT_MAX_RETRIES`] and + /// [`DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS`], which defaults to 1 peer, so 3 peers. + pub const DEFAULT_MAX_COUNT_FALLBACK_PEERS: u8 = + DEFAULT_MAX_RETRIES + DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS; + + /// Default marginal on fallback peers. This is the case, since a transaction is only requested + /// once from each individual peer. Default is 1 peer. + const DEFAULT_MARGINAL_COUNT_FALLBACK_PEERS: u8 = 1; + + /* ==================== CONCURRENCY ==================== */ + + /// Default maximum concurrent [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) + /// requests. Default is the product of [`DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER`], + /// which defaults to 1 request, and the sum of [`DEFAULT_MAX_COUNT_PEERS_INBOUND`] and + /// [`DEFAULT_MAX_COUNT_PEERS_OUTBOUND`], which default to 30 and 100 peers respectively, so + /// 130 requests. + pub const DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS: u32 = + DEFAULT_MAX_COUNT_PEERS_INBOUND + DEFAULT_MAX_COUNT_PEERS_OUTBOUND; + + /// Default maximum number of concurrent + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)s to allow per peer. This + /// number reflects concurrent requests for different hashes. Default is 1 request. + pub const DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER: u8 = 1; + + /* =============== HASHES PENDING FETCH ================ */ + + /// Default limit for number of transactions waiting for an idle peer to be fetched from. + /// Default is 100 times the [`SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST`], + /// which defaults to 256 hashes, so 25 600 hashes. + pub const DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH: usize = + 100 * SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST; + + /// Default maximum number of hashes pending fetch to tolerate at any time. Default is half of + /// [`DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH`], which defaults to 25 600 hashes, so 12 800 + /// hashes. + pub const DEFAULT_MAX_COUNT_PENDING_FETCH: usize = DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH / 2; + + /* ====== LIMITED CAPACITY ON FETCH PENDING HASHES ====== */ + + /// Default budget for finding an idle fallback peer for any hash pending fetch, when said + /// search is budget constrained. Default is a sixth of [`DEFAULT_MAX_COUNT_PENDING_FETCH`], + /// which defaults to 12 800 hashes (the breadth of the search), divided by + /// [`DEFAULT_MAX_COUNT_FALLBACK_PEERS`], which defaults to 3 peers (the depth of the search), + /// so the 711 lru hashes in the pending hashes cache. + pub const DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER: usize = + DEFAULT_MAX_COUNT_PENDING_FETCH / 6 / DEFAULT_MAX_COUNT_FALLBACK_PEERS as usize; + + /// Default budget for finding hashes in the intersection of transactions announced by a peer + /// and in the cache of hashes pending fetch, when said search is budget constrained. Default + /// is a sixth of [`DEFAULT_MAX_COUNT_PENDING_FETCH`], which defaults to 12 800 hashes (the + /// breadth of the search), so 2133 lru hashes in the pending hashes cache. + pub const DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH: usize = + DEFAULT_MAX_COUNT_PENDING_FETCH / 6; + + /* ====== SCALARS FOR USE ON FETCH PENDING HASHES ====== */ + + /// Default soft limit for the number of hashes in a + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request, when it is filled + /// from hashes pending fetch. Default is half of the + /// [`SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST`] which by spec is 256 + /// hashes, so 128 hashes. + pub const DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES: + usize = SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST / 2; + + /// Default soft limit for a [`PooledTransactions`](reth_eth_wire::PooledTransactions) response + /// when it's used as expected response in calibrating the filling of a + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request, when the request + /// is filled from hashes pending fetch. Default is half of + /// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST`], + /// which defaults to 128 KiB, so 64 KiB. + pub const DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES: + usize = DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST / 2; + + /// Default max inflight request when fetching pending hashes. Default is half of + /// [`DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS`], which defaults to 130 requests, so 65 requests. + pub const DEFAULT_MAX_COUNT_INFLIGHT_REQUESTS_ON_FETCH_PENDING_HASHES: usize = + DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize / 2; + + /// Default divisor of the max inflight request when calculating search breadth of the search + /// for any idle peer to which to send a request filled with hashes pending fetch. The max + /// inflight requests is configured in + /// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo). Default + /// is 3 requests. + pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER: usize = 3; + + /// Default divisor of the max inflight request when calculating search breadth of the search + /// for the intersection of hashes announced by a peer and hashes pending fetch. The max + /// inflight requests is configured in + /// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo). Default + /// is 2 requests. + pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION: usize = 2; + + // Default divisor to the max pending pool imports when calculating search breadth of the + /// search for any idle peer to which to send a request filled with hashes pending fetch. + /// The max pending pool imports is configured in + /// [`PendingPoolImportsInfo`](crate::transactions::PendingPoolImportsInfo). Default + /// is 4 requests. + pub const DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER: usize = 4; + + /// Default divisor to the max pending pool imports when calculating search breadth of the + /// search for any idle peer to which to send a request filled with hashes pending fetch. + /// The max pending pool imports is configured in + /// [`PendingPoolImportsInfo`](crate::transactions::PendingPoolImportsInfo). Default + /// is 3 requests. + pub const DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION: usize = 3; + + /* ================== ROUGH MEASURES ================== */ + + /// Average byte size of an encoded transaction. Default is + /// [`SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE`], which defaults to 2 MiB, + /// divided by [`SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`], which + /// is spec'd at 4096 hashes, so 521 bytes. + pub const AVERAGE_BYTE_SIZE_TX_ENCODED: usize = + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE / + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; + + /// Median observed size in bytes of a small encoded legacy transaction. Default is 120 bytes. + pub const MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED: usize = 120; +} diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index b9c28cf25fd7..5d527206481a 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -2,13 +2,17 @@ use crate::{ cache::{LruCache, LruMap}, message::PeerRequest, }; +use derive_more::Constructor; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use pin_project::pin_project; -use reth_eth_wire::{GetPooledTransactions, HandleAnnouncement, ValidTxHashes}; +use reth_eth_wire::{ + GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData, +}; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; use schnellru::{ByLength, Unlimited}; use std::{ + collections::HashMap, num::NonZeroUsize, pin::Pin, task::{Context, Poll}, @@ -17,59 +21,37 @@ use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError} use tracing::{debug, trace}; use super::{ + constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST}, AnnouncementFilter, Peer, PooledTransactions, - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, }; -/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer. -pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1; - -/// How many peers we keep track of for each missing transaction. -pub(super) const MAX_ALTERNATIVE_PEERS_PER_TX: u8 = - MAX_REQUEST_RETRIES_PER_TX_HASH + MARGINAL_FALLBACK_PEERS_PER_TX; - -/// Marginal on fallback peers. If all fallback peers are idle, at most -/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`] of them can ever be needed. -const MARGINAL_FALLBACK_PEERS_PER_TX: u8 = 1; - -/// Maximum request retires per [`TxHash`]. Note, this is reset should the [`TxHash`] re-appear in -/// an announcement after it has been ejected from the hash buffer. -const MAX_REQUEST_RETRIES_PER_TX_HASH: u8 = 2; - -/// Maximum concurrent [`GetPooledTxRequest`]s. -const MAX_CONCURRENT_TX_REQUESTS: u32 = 10000; - -/// Cache limit of transactions waiting for idle peer to be fetched. -const MAX_CAPACITY_BUFFERED_HASHES: usize = 100 * GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES; - -/// Recommended soft limit for the number of hashes in a GetPooledTransactions message (8kb) -/// -/// -const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256; - /// The type responsible for fetching missing transactions from peers. /// /// This will keep track of unique transaction hashes that are currently being fetched and submits /// new requests on announced hashes. #[derive(Debug)] #[pin_project] -pub(super) struct TransactionFetcher { - /// All peers to which a request for pooled transactions is currently active. Maps 1-1 to - /// `inflight_requests`. +pub(crate) struct TransactionFetcher { + /// All peers with to which a [`GetPooledTransactions`] request is inflight. pub(super) active_peers: LruMap, - /// All currently active requests for pooled transactions. + /// All currently active [`GetPooledTransactions`] requests. + /// + /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher. + /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to + /// be fetched. #[pin] pub(super) inflight_requests: FuturesUnordered, - /// Hashes that are awaiting an idle peer so they can be fetched. - // todo: store buffered eth68 and eth66 hashes separately - pub(super) buffered_hashes: LruCache, - /// Tracks all hashes that are currently being fetched or are buffered, mapping them to - /// request retries and last recently seen fallback peers (max one request try for any peer). - pub(super) unknown_hashes: LruMap), Unlimited>, - /// Size metadata for unknown eth68 hashes. - pub(super) eth68_meta: LruMap, + /// Hashes that are awaiting an idle fallback peer so they can be fetched. + /// + /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for + /// which a [`GetPooledTransactions`] request is inflight. + pub(super) hashes_pending_fetch: LruCache, + /// Tracks all hashes in the transaction fetcher. + pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap, /// Filter for valid eth68 announcements. pub(super) filter_valid_hashes: AnnouncementFilter, + /// Info on capacity of the transaction fetcher. + pub(super) info: TransactionFetcherInfo, } // === impl TransactionFetcher === @@ -82,17 +64,16 @@ impl TransactionFetcher { I: IntoIterator, { for hash in hashes { - self.unknown_hashes.remove(&hash); - self.eth68_meta.remove(&hash); - self.buffered_hashes.remove(&hash); + self.hashes_fetch_inflight_and_pending_fetch.remove(&hash); + self.hashes_pending_fetch.remove(&hash); } } /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`]. - fn decrement_inflight_request_count_for(&mut self, peer_id: PeerId) { + fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) { let remove = || -> bool { - if let Some(inflight_count) = self.active_peers.get(&peer_id) { - if *inflight_count <= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { + if let Some(inflight_count) = self.active_peers.get(peer_id) { + if *inflight_count <= DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { return true } *inflight_count -= 1; @@ -101,196 +82,241 @@ impl TransactionFetcher { }(); if remove { - self.active_peers.remove(&peer_id); + self.active_peers.remove(peer_id); } } - /// Returns `true` if peer is idle. - pub(super) fn is_idle(&self, peer_id: PeerId) -> bool { - let Some(inflight_count) = self.active_peers.peek(&peer_id) else { return true }; - if *inflight_count < MAX_CONCURRENT_TX_REQUESTS_PER_PEER { + /// Returns `true` if peer is idle with respect to `self.inflight_requests`. + pub(super) fn is_idle(&self, peer_id: &PeerId) -> bool { + let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true }; + if *inflight_count < DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { return true } false } - /// Returns any idle peer for the given hash. Writes peer IDs of any ended sessions to buffer - /// passed as parameter. + /// Returns any idle peer for the given hash. pub(super) fn get_idle_peer_for( &self, hash: TxHash, - ended_sessions_buf: &mut Vec, - is_session_active: impl Fn(PeerId) -> bool, - ) -> Option { - let (_, peers) = self.unknown_hashes.peek(&hash)?; - - for &peer_id in peers.iter() { - if self.is_idle(peer_id) { - if is_session_active(peer_id) { - return Some(peer_id) - } else { - ended_sessions_buf.push(peer_id); - } + is_session_active: impl Fn(&PeerId) -> bool, + ) -> Option<&PeerId> { + let TxFetchMetadata { fallback_peers, .. } = + self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?; + + for peer_id in fallback_peers.iter() { + if self.is_idle(peer_id) && is_session_active(peer_id) { + return Some(peer_id) } } None } - /// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes. - pub(super) fn pack_hashes( + /// Returns any idle peer for any hash pending fetch. If one is found, the corresponding + /// hash is written to the request buffer that is passed as parameter. + /// + /// Loops through the hashes pending fetch in lru order until one is found with an idle + /// fallback peer, or the budget passed as parameter is depleted, whatever happens first. + pub(super) fn find_any_idle_fallback_peer_for_any_pending_hash( &mut self, - hashes: &mut ValidTxHashes, - peer_id: PeerId, - ) -> ValidTxHashes { - if hashes.is_empty() { - return ValidTxHashes::empty(hashes.msg_version()) + hashes_to_request: &mut RequestTxHashes, + is_session_active: impl Fn(&PeerId) -> bool, + mut budget: Option, // search fallback peers for max `budget` lru pending hashes + ) -> Option { + let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter(); + + let idle_peer = loop { + let &hash = hashes_pending_fetch_iter.next()?; + + let idle_peer = self.get_idle_peer_for(hash, &is_session_active); + + if idle_peer.is_some() { + hashes_to_request.push(hash); + break idle_peer.copied() + } + + if let Some(ref mut bud) = budget { + *bud = bud.saturating_sub(1); + if *bud == 0 { + return None + } + } }; + let hash = hashes_to_request.first()?; - if hashes.msg_version().is_eth68() { - return self.pack_hashes_eth68(hashes, peer_id) - } - self.pack_hashes_eth66(hashes) + // pop hash that is loaded in request buffer from cache of hashes pending fetch + drop(hashes_pending_fetch_iter); + _ = self.hashes_pending_fetch.remove(hash); + + idle_peer } - /// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 66. - /// If necessary, takes hashes from buffer for which peer is listed as fallback peer. + /// Packages hashes for a [`GetPooledTxRequest`] up to limit. Returns left over hashes. Takes + /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request. /// /// Returns left over hashes. - pub(super) fn pack_hashes_eth66(&mut self, hashes: &mut ValidTxHashes) -> ValidTxHashes { - if hashes.len() <= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { - return ValidTxHashes::empty_eth66() + pub(super) fn pack_request( + &mut self, + hashes_to_request: &mut RequestTxHashes, + hashes_from_announcement: ValidAnnouncementData, + ) -> RequestTxHashes { + if hashes_from_announcement.msg_version().is_eth68() { + return self.pack_request_eth68(hashes_to_request, hashes_from_announcement) } - let surplus_hashes = hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES - 1); - - ValidTxHashes::new_eth66(surplus_hashes) + self.pack_request_eth66(hashes_to_request, hashes_from_announcement) } - /// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68 - /// request, based on the size of the transaction and the accumulated size of the - /// corresponding `PooledTransactions` response. + /// Packages hashes for a [`GetPooledTxRequest`] from an + /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement up to limit as defined by protocol + /// version 68. Takes a [`RequestTxHashes`] buffer as parameter for filling with hashes to + /// request. /// - /// Returns `true` if hash is included in request. If there is still space in the respective - /// response but not enough for the transaction of given hash, `false` is returned. - fn include_eth68_hash(&self, acc_size_response: &mut usize, hash: TxHash) -> bool { - debug_assert!( - self.eth68_meta.peek(&hash).is_some(), - "can't find eth68 metadata for `%hash` that should be of version eth68, broken invariant `@eth68_meta` and `@self`, -`%hash`: {}, -`@self`: {:?}", - hash, self - ); + /// Returns left over hashes. + /// + /// Loops through hashes passed as parameter and checks if a hash fits in the expected + /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request + /// and expected response size is accumulated. + pub(super) fn pack_request_eth68( + &mut self, + hashes_to_request: &mut RequestTxHashes, + hashes_from_announcement: ValidAnnouncementData, + ) -> RequestTxHashes { + let mut acc_size_response = 0; + let hashes_from_announcement_len = hashes_from_announcement.len(); + + let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter(); - if let Some(size) = self.eth68_meta.peek(&hash) { - let next_acc_size = *acc_size_response + size; + if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() { + hashes_to_request.push(hash); - if next_acc_size <= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE { + // tx is really big, pack request with single tx + if size >= DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST { + return hashes_from_announcement_iter.collect::() + } else { + acc_size_response = size; + } + } + + let mut surplus_hashes = RequestTxHashes::with_capacity(hashes_from_announcement_len - 1); + + 'fold_size: loop { + let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break }; + + let Some((_ty, size)) = metadata else { + unreachable!("this method is called upon reception of an eth68 announcement") + }; + + let next_acc_size = acc_size_response + size; + + if next_acc_size <= DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST { // only update accumulated size of tx response if tx will fit in without exceeding // soft limit - *acc_size_response = next_acc_size; - return true + acc_size_response = next_acc_size; + hashes_to_request.push(hash) + } else { + surplus_hashes.push(hash) + } + + let free_space = + DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST - acc_size_response; + + if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED { + break 'fold_size } } - false + surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash)); + surplus_hashes.shrink_to_fit(); + + surplus_hashes } - /// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 68. - /// If necessary, takes hashes from buffer for which peer is listed as fallback peer. Returns - /// left over hashes. + /// Packages hashes for a [`GetPooledTxRequest`] from an + /// [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcement up to limit as defined by + /// protocol version 66. Takes a [`RequestTxHashes`] buffer as parameter for filling with + /// hashes to request. /// - /// 1. Loops through hashes passed as parameter, calculating the accumulated size of the - /// response that this request would generate if filled with requested hashes. - /// 2.a. All hashes fit in response and there is no more space. Returns empty vector. - /// 2.b. Some hashes didn't fit in and there is no more space. Returns surplus hashes. - /// 2.c. All hashes fit in response and there is still space. Surplus hashes = empty vector. - /// 2.d. Some hashes didn't fit in but there is still space. Surplus hashes != empty vector. - /// 3. Try to fill remaining space with hashes from buffer. - /// 4. Return surplus hashes. - pub(super) fn pack_hashes_eth68( + /// Returns left over hashes. + pub(super) fn pack_request_eth66( &mut self, - hashes: &mut ValidTxHashes, - peer_id: PeerId, - ) -> ValidTxHashes { - if let Some(hash) = hashes.first() { - if let Some(size) = self.eth68_meta.get(hash) { - if *size >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE { - let surplus_hashes = hashes.split_off(1); - return ValidTxHashes::new_eth68(surplus_hashes) - } - } - } + hashes_to_request: &mut RequestTxHashes, + hashes_from_announcement: ValidAnnouncementData, + ) -> RequestTxHashes { + let (mut request_hashes, _version) = hashes_from_announcement.into_request_hashes(); + if request_hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST { + *hashes_to_request = request_hashes; + + RequestTxHashes::default() + } else { + let surplus_hashes = request_hashes + .split_off(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST - 1); - let mut acc_size_response = 0; - let mut surplus_hashes = vec![]; + *hashes_to_request = request_hashes; - hashes.retain(|&hash| match self.include_eth68_hash(&mut acc_size_response, hash) { - true => true, - false => { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"), - acc_size_response=acc_size_response, - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE= - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, - "no space for hash in `GetPooledTransactions` request to peer" - ); + RequestTxHashes::new(surplus_hashes) + } + } - surplus_hashes.push(hash); - false + /// Tries to buffer hashes for retry. + pub(super) fn buffer_hashes_for_retry( + &mut self, + mut hashes: RequestTxHashes, + peer_failed_to_serve: &PeerId, + ) { + // It could be that the txns have been received over broadcast in the time being. Remove + // the peer as fallback peer so it isn't request again for these hashes. + hashes.retain(|hash| { + if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) { + entry.fallback_peers_mut().remove(peer_failed_to_serve); + return true } + // tx has been seen over broadcast in the time it took for the request to resolve + false }); - ValidTxHashes::new_eth68(surplus_hashes) - } - - pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: ValidTxHashes) { - // It could be that the txns have been received over broadcast in the time being. - hashes.retain(|hash| self.unknown_hashes.get(hash).is_some()); - self.buffer_hashes(hashes, None) } /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be - /// passed as `fallback_peer` parameter! Hashes that have been re-requested - /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. - pub(super) fn buffer_hashes(&mut self, hashes: ValidTxHashes, fallback_peer: Option) { + /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use + /// [`TransactionFetcher::buffer_hashes_for_retry`]. Hashes that have been re-requested + /// [`DEFAULT_MAX_RETRIES`], are dropped. + pub(super) fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option) { let mut max_retried_and_evicted_hashes = vec![]; - let msg_version = hashes.msg_version(); - for hash in hashes.into_iter() { - // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 debug_assert!( - self.unknown_hashes.peek(&hash).is_some(), + self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_some(), "`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, `%hash`: {hash}, `@self`: {self:?}", ); - let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; + let Some(TxFetchMetadata { retries, fallback_peers, .. }) = + self.hashes_fetch_inflight_and_pending_fetch.get(&hash) + else { + return + }; if let Some(peer_id) = fallback_peer { // peer has not yet requested hash - peers.insert(peer_id); + fallback_peers.insert(peer_id); } else { - // peer in caller's context has requested hash and is hence not eligible as - // fallback peer. - if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { + if *retries >= DEFAULT_MAX_RETRIES { debug!(target: "net::tx", hash=%hash, retries=retries, - msg_version=%msg_version, "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" ); max_retried_and_evicted_hashes.push(hash); - continue + continue; } *retries += 1; } - if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) { + if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash) + { max_retried_and_evicted_hashes.push(evicted_hash); } } @@ -298,6 +324,77 @@ impl TransactionFetcher { self.remove_from_unknown_hashes(max_retried_and_evicted_hashes); } + /// Tries to request hashes pending fetch. + /// + /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of + /// the request by checking the transactions seen by the peer against the buffer. + pub(super) fn on_fetch_pending_hashes( + &mut self, + peers: &HashMap, + has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, + metrics_increment_egress_peer_channel_full: impl FnOnce(), + ) { + let mut hashes_to_request = RequestTxHashes::with_capacity(32); + let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id); + + // budget to look for an idle peer before giving up + let budget_find_idle_fallback_peer = self + .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports); + + let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash( + &mut hashes_to_request, + is_session_active, + budget_find_idle_fallback_peer, + ) else { + // no peers are idle or budget is depleted + return + }; + // peer should always exist since `is_session_active` already checked + let Some(peer) = peers.get(&peer_id) else { return }; + let conn_eth_version = peer.version; + + // fill the request with more hashes pending fetch that have been announced by the peer. + // the search for more hashes is done with respect to the given budget, which determines + // how many hashes to loop through before giving up. if no more hashes are found wrt to + // the budget, the single hash that was taken out of the cache above is sent in a request. + let budget_fill_request = self + .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer( + &has_capacity_wrt_pending_pool_imports, + ); + + self.fill_request_from_hashes_pending_fetch( + &mut hashes_to_request, + peer.seen_transactions.maybe_pending_transaction_hashes(), + budget_fill_request, + ); + + // free unused memory + hashes_to_request.shrink_to_fit(); + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=?*hashes_to_request, + conn_eth_version=%conn_eth_version, + "requesting hashes that were stored pending fetch from peer" + ); + + // request the buffered missing transactions + if let Some(failed_to_request_hashes) = self.request_transactions_from_peer( + hashes_to_request, + peer, + metrics_increment_egress_peer_channel_full, + ) { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + failed_to_request_hashes=?failed_to_request_hashes, + conn_eth_version=%conn_eth_version, + "failed sending request to peer's session, buffering hashes" + ); + + self.buffer_hashes(failed_to_request_hashes, Some(peer_id)); + } + } + /// Removes the provided transaction hashes from the inflight requests set. /// /// This is called when we receive full transactions that are currently scheduled for fetching. @@ -309,20 +406,47 @@ impl TransactionFetcher { self.remove_from_unknown_hashes(hashes) } - pub(super) fn filter_unseen_and_pending_hashes( + /// Filters out hashes that have been seen before. For hashes that have already been seen, the + /// peer is added as fallback peer. + pub(super) fn filter_unseen_and_pending_hashes( &mut self, - new_announced_hashes: &mut T, - peer_id: PeerId, + new_announced_hashes: &mut ValidAnnouncementData, + peer_id: &PeerId, is_session_active: impl Fn(PeerId) -> bool, + client_version: &str, ) { + #[cfg(not(debug_assertions))] + let mut previously_unseen_hashes_count = 0; + #[cfg(debug_assertions)] + let mut previously_unseen_hashes = Vec::with_capacity(new_announced_hashes.len() / 4); + let msg_version = new_announced_hashes.msg_version(); // filter out inflight hashes, and register the peer as fallback for all inflight hashes - new_announced_hashes.retain_by_hash(|hash| { + new_announced_hashes.retain(|hash, metadata| { // occupied entry - if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(&hash) { + if let Some(TxFetchMetadata{ref mut fallback_peers, tx_encoded_length: ref mut previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) { + // update size metadata if available + if let Some((_ty, size)) = metadata { + if let Some(prev_size) = previously_seen_size { + // check if this peer is announcing a different size than a previous peer + if size != prev_size { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=%hash, + size=size, + previously_seen_size=previously_seen_size, + client_version=%client_version, + "peer announced a different size for tx, this is especially worrying if one size is much bigger..." + ); + } + } + // believe the most recent peer to announce tx + *previously_seen_size = Some(*size); + } + // hash has been seen but is not inflight - if self.buffered_hashes.remove(&hash) { + if self.hashes_pending_fetch.remove(hash) { return true } // hash has been seen and is in flight. store peer as fallback peer. @@ -330,13 +454,13 @@ impl TransactionFetcher { // remove any ended sessions, so that in case of a full cache, alive peers aren't // removed in favour of lru dead peers let mut ended_sessions = vec!(); - for &peer_id in backups.iter() { + for &peer_id in fallback_peers.iter() { if is_session_active(peer_id) { ended_sessions.push(peer_id); } } for peer_id in ended_sessions { - backups.remove(&peer_id); + fallback_peers.remove(&peer_id); } return false @@ -344,24 +468,25 @@ impl TransactionFetcher { // vacant entry - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - msg_version=%msg_version, - "new hash seen in announcement by peer" - ); + #[cfg(not(debug_assertions))] + { + previously_unseen_hashes_count += 1; + } + #[cfg(debug_assertions)] + previously_unseen_hashes.push(*hash); // todo: allow `MAX_ALTERNATIVE_PEERS_PER_TX` to be zero - let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).expect("MAX_ALTERNATIVE_PEERS_PER_TX should be non-zero"); + let limit = NonZeroUsize::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS.into()).expect("MAX_ALTERNATIVE_PEERS_PER_TX should be non-zero"); - if self.unknown_hashes.get_or_insert(*hash, || - (0, LruCache::new(limit)) + if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, || + TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(limit), tx_encoded_length: None} ).is_none() { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), hash=%hash, msg_version=%msg_version, + client_version=%client_version, "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" ); @@ -370,34 +495,48 @@ impl TransactionFetcher { true }); + #[cfg(not(debug_assertions))] + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + previously_unseen_hashes_count=previously_unseen_hashes_count, + msg_version=%msg_version, + client_version=%client_version, + "received previously unseen hashes in announcement from peer" + ); + + #[cfg(debug_assertions)] trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), msg_version=%msg_version, + client_version=%client_version, + previously_unseen_hashes_len=?previously_unseen_hashes.len(), + previously_unseen_hashes=?previously_unseen_hashes, "received previously unseen hashes in announcement from peer" ); } - /// Requests the missing transactions from the announced hashes of the peer. Returns the - /// requested hashes if concurrency limit is reached or if the request fails to send over the - /// channel to the peer's session task. + /// Requests the missing transactions from the previously unseen announced hashes of the peer. + /// Returns the requested hashes if the request concurrency limit is reached or if the request + /// fails to send over the channel to the peer's session task. /// /// This filters all announced hashes that are already in flight, and requests the missing, /// while marking the given peer as an alternative peer for the hashes that are already in /// flight. pub(super) fn request_transactions_from_peer( &mut self, - new_announced_hashes: ValidTxHashes, + new_announced_hashes: RequestTxHashes, peer: &Peer, metrics_increment_egress_peer_channel_full: impl FnOnce(), - ) -> Option { + ) -> Option { let peer_id: PeerId = peer.request_tx.peer_id; + let conn_eth_version = peer.version; - if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS { + if self.active_peers.len() >= self.info.max_inflight_requests { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), new_announced_hashes=?*new_announced_hashes, - msg_version=%new_announced_hashes.msg_version(), - limit=MAX_CONCURRENT_TX_REQUESTS, + conn_eth_version=%conn_eth_version, + max_inflight_transaction_requests=self.info.max_inflight_requests, "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer" ); return Some(new_announced_hashes) @@ -407,18 +546,18 @@ impl TransactionFetcher { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), new_announced_hashes=?*new_announced_hashes, - msg_version=%new_announced_hashes.msg_version(), + conn_eth_version=%conn_eth_version, "failed to cache active peer in schnellru::LruMap, dropping request to peer" ); return Some(new_announced_hashes) }; - if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { + if *inflight_count >= DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), new_announced_hashes=?*new_announced_hashes, - msg_version=%new_announced_hashes.msg_version(), - limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER, + conn_eth_version=%conn_eth_version, + MAX_CONCURRENT_TX_REQUESTS_PER_PEER=DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER, "limit for concurrent `GetPooledTransactions` requests per peer reached" ); return Some(new_announced_hashes) @@ -429,7 +568,7 @@ impl TransactionFetcher { debug_assert!( || -> bool { for hash in new_announced_hashes.iter() { - if self.buffered_hashes.contains(hash) { + if self.hashes_pending_fetch.contains(hash) { return false } } @@ -456,7 +595,7 @@ impl TransactionFetcher { let req = req.into_get_pooled_transactions().expect("is get pooled tx"); metrics_increment_egress_peer_channel_full(); - return Some(ValidTxHashes::new(req.0, new_announced_hashes.msg_version())) + return Some(RequestTxHashes::new(req.0)) } } } else { @@ -471,180 +610,178 @@ impl TransactionFetcher { None } - /// Tries to fill request with eth68 hashes so that the respective tx response is at its size - /// limit. It does so by taking buffered eth68 hashes for which peer is listed as fallback - /// peer. A mutable reference to a list of hashes to request is passed as parameter. - /// - /// If a single transaction exceeds the soft limit, it's fetched in its own request. Otherwise - /// the following applies. + /// Tries to fill request with hashes pending fetch so that the expected [`PooledTransactions`] + /// response is full enough. A mutable reference to a list of hashes to request is passed as + /// parameter. A budget is passed as parameter, this ensures that the node stops searching + /// for more hashes after the budget is depleted. Under bad network conditions, the cache of + /// hashes pending fetch may become very full for a while. As the node recovers, the hashes + /// pending fetch cache should get smaller. The budget should aim to be big enough to loop + /// through all buffered hashes in good network conditions. /// - /// Loops through buffered hashes and does: + /// The request hashes buffer is filled as if it's an eth68 request, i.e. smartly assemble + /// the request based on expected response size. For any hash missing size metadata, it is + /// guessed at [`AVERAGE_BYTE_SIZE_TX_ENCODED`]. + + /// Loops through hashes pending fetch and does: /// - /// 1. Check if acc size exceeds limit or if hashes count exceeds limit, if so stop looping. - /// 2. Check if this buffered hash is an eth68 hash, else skip to next iteration. - /// 3. Check if hash can be included with respect to size metadata and acc size copy. - /// 4. Check if peer is fallback peer for hash and remove, else skip to next iteration. - /// 4. Add hash to hashes list parameter. - /// 5. Overwrite eth68 acc size with copy. - pub(super) fn fill_eth68_request_for_peer( + /// 1. Check if a hash pending fetch is seen by peer. + /// 2. Optimistically include the hash in the request. + /// 3. Accumulate expected total response size. + /// 4. Check if acc size and hashes count is at limit, if so stop looping. + /// 5. Remove hashes to request from cache of hashes pending fetch. + pub(super) fn fill_request_from_hashes_pending_fetch( &mut self, - hashes: &mut Vec, - peer_id: PeerId, - acc_size_response: &mut usize, + hashes_to_request: &mut RequestTxHashes, + seen_hashes: &LruCache, + mut budget_fill_request: Option, // check max `budget` lru pending hashes ) { - if *acc_size_response >= SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE / 2 { + let Some(hash) = hashes_to_request.first() else { return }; + + let mut acc_size_response = self + .hashes_fetch_inflight_and_pending_fetch + .get(hash) + .and_then(|entry| entry.tx_encoded_len()) + .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED); + + // if request full enough already, we're satisfied, send request for single tx + if acc_size_response >= + DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES + { return } - // all hashes included in request and there is still a lot of space - - debug_assert!( + // try to fill request by checking if any other hashes pending fetch (in lru order) are + // also seen by peer + for hash in self.hashes_pending_fetch.iter() { + // 1. Check if a hash pending fetch is seen by peer. + if !seen_hashes.contains(hash) { + continue; + }; + + // 2. Optimistically include the hash in the request. + hashes_to_request.push(*hash); + + // 3. Accumulate expected total response size. + let size = self + .hashes_fetch_inflight_and_pending_fetch + .get(hash) + .and_then(|entry| entry.tx_encoded_len()) + .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED); + + acc_size_response += size; + + // 4. Check if acc size or hashes count is at limit, if so stop looping. + // if expected response is full enough or the number of hashes in the request is + // enough, we're satisfied + if acc_size_response >= + DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES || + hashes_to_request.len() > + DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES { - let mut acc_size = 0; - for &hash in hashes.iter() { - _ = self.include_eth68_hash(&mut acc_size, hash); - } - acc_size == *acc_size_response - }, - "an eth68 request is being assembled and caller has miscalculated accumulated size of corresponding transactions response, broken invariant `%acc_size_response` and `%hashes`, -`%acc_size_response`: {:?}, -`%hashes`: {:?}, -`@self`: {:?}", - acc_size_response, hashes, self - ); - - for hash in self.buffered_hashes.iter() { - // fill request to 2/3 of the soft limit for the response size, or until the number of - // hashes reaches the soft limit number for a request (like in eth66), whatever - // happens first - if hashes.len() > GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { break } - // copy acc size - let mut next_acc_size = *acc_size_response; - - // 1. Check acc size against limit, if so stop looping. - if next_acc_size >= 2 * SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE / 3 { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - acc_size_eth68_response=acc_size_response, // no change acc size - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE= - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, - "request to peer full" - ); - - break - } - // 2. Check if this buffered hash is an eth68 hash, else skip to next iteration. - if self.eth68_meta.get(hash).is_none() { - continue - } - // 3. Check if hash can be included with respect to size metadata and acc size copy. - // - // mutates acc size copy - if !self.include_eth68_hash(&mut next_acc_size, *hash) { - continue - } - - debug_assert!( - self.unknown_hashes.get(hash).is_some(), - "can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, -`%hash`: {}, -`@self`: {:?}", - hash, self - ); - - if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { - // 4. Check if peer is fallback peer for hash and remove, else skip to next - // iteration. - // - // upgrade this peer from fallback peer, soon to be active peer with inflight - // request. since 1 retry per peer per tx hash on this tx fetcher layer, remove - // peer. - if fallback_peers.remove(&peer_id) { - // 4. Add hash to hashes list parameter. - hashes.push(*hash); - // 5. Overwrite eth68 acc size with copy. - *acc_size_response = next_acc_size; - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - acc_size_eth68_response=acc_size_response, - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE= - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, - "found buffered hash for request to peer" - ); + if let Some(ref mut bud) = budget_fill_request { + *bud = bud.saturating_sub(1); + if *bud == 0 { + return } } } - // remove hashes that will be included in request from buffer - for hash in hashes { - self.buffered_hashes.remove(hash); + // 5. Remove hashes to request from cache of hashes pending fetch. + for hash in hashes_to_request.iter() { + self.hashes_pending_fetch.remove(hash); } } - /// Tries to fill request with eth66 hashes so that the respective tx response is at its size - /// limit. It does so by taking buffered hashes for which peer is listed as fallback peer. A - /// mutable reference to a list of hashes to request is passed as parameter. - /// - /// Loops through buffered hashes and does: - /// - /// 1. Check if this buffered hash is an eth66 hash, else skip to next iteration. - /// 2. Check hashes count in request, if max reached stop looping. - /// 3. Check if peer is fallback peer for hash and remove, else skip to next iteration. - /// 4. Add hash to hashes list parameter. This increases length i.e. hashes count. + /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns + /// `false` if [`TransactionFetcher`] is operating close to full capacity. + pub(super) fn has_capacity_for_fetching_pending_hashes(&self) -> bool { + let info = &self.info; + + self.has_capacity(info.max_inflight_requests) + } + + /// Returns `true` if the number of inflight requests are under a given tolerated max. + fn has_capacity(&self, max_inflight_requests: usize) -> bool { + self.inflight_requests.len() <= max_inflight_requests + } + + /// Returns the limit to enforce when looking for any pending hash with an idle fallback peer. /// - /// Removes hashes included in request from buffer. - pub(super) fn fill_eth66_request_for_peer( - &mut self, - hashes: &mut Vec, - peer_id: PeerId, - ) { - for hash in self.buffered_hashes.iter() { - // 1. Check hashes count in request. - if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { - break - } - // 2. Check if this buffered hash is an eth66 hash. - if self.eth68_meta.get(hash).is_some() { - continue - } + /// Returns `Some(limit)` if [`TransactionFetcher`] and the + /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full + /// capacity. Returns `None`, unlimited, if they are not that busy. + pub(super) fn search_breadth_budget_find_idle_fallback_peer( + &self, + has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, + ) -> Option { + let info = &self.info; - debug_assert!( - self.unknown_hashes.get(hash).is_some(), - "can't find buffered `%hash` in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, -`%hash`: {}, -`@self`: {:?}", - hash, self + let tx_fetcher_has_capacity = self.has_capacity( + info.max_inflight_requests / + DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER, + ); + let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports( + DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER, + ); + + if tx_fetcher_has_capacity && tx_pool_has_capacity { + // unlimited search breadth + None + } else { + // limited breadth of search for idle peer + let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER; + + trace!(target: "net::tx", + inflight_requests=self.inflight_requests.len(), + max_inflight_transaction_requests=info.max_inflight_requests, + hashes_pending_fetch=self.hashes_pending_fetch.len(), + limit=limit, + "search breadth limited in search for idle fallback peer for some hash pending fetch" ); - if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { - // 3. Check if peer is fallback peer for hash and remove. - // - // upgrade this peer from fallback peer, soon to be active peer with inflight - // request. since 1 retry per peer per tx hash on this tx fetcher layer, remove - // peer. - if fallback_peers.remove(&peer_id) { - // 4. Add hash to hashes list parameter. - hashes.push(*hash); - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hash=%hash, - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE= - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, - "found buffered hash for request to peer" - ); - } - } + Some(limit) } + } - // remove hashes that will be included in request from buffer - for hash in hashes { - self.buffered_hashes.remove(hash); + /// Returns the limit to enforce when looking for the intersection between hashes announced by + /// peer and hashes pending fetch. + /// + /// Returns `Some(limit)` if [`TransactionFetcher`] and the + /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full + /// capacity. Returns `None`, unlimited, if they are not that busy. + pub(super) fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer( + &self, + has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, + ) -> Option { + let info = &self.info; + + let tx_fetcher_has_capacity = self.has_capacity( + info.max_inflight_requests / + DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION, + ); + let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports( + DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION, + ); + + if tx_fetcher_has_capacity && tx_pool_has_capacity { + // unlimited search breadth + None + } else { + // limited breadth of search for idle peer + let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH; + + trace!(target: "net::tx", + inflight_requests=self.inflight_requests.len(), + max_inflight_transaction_requests=self.info.max_inflight_requests, + hashes_pending_fetch=self.hashes_pending_fetch.len(), + limit=limit, + "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch" + ); + + Some(limit) } } } @@ -660,7 +797,7 @@ impl Stream for TransactionFetcher { if let Poll::Ready(Some(response)) = res { // update peer activity, requests for buffered hashes can only be made to idle // fallback peers - let GetPooledTxResponse { peer_id, .. } = response; + let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; debug_assert!( self.active_peers.get(&peer_id).is_some(), @@ -670,9 +807,7 @@ impl Stream for TransactionFetcher { peer_id, self ); - self.decrement_inflight_request_count_for(peer_id); - - let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; + self.decrement_inflight_request_count_for(&peer_id); return match result { Ok(Ok(transactions)) => { @@ -686,9 +821,10 @@ impl Stream for TransactionFetcher { } true }); + self.remove_from_unknown_hashes(fetched); // buffer left over hashes - self.buffer_hashes_for_retry(requested_hashes); + self.buffer_hashes_for_retry(requested_hashes, &peer_id); Poll::Ready(Some(FetchEvent::TransactionsFetched { peer_id, @@ -696,11 +832,11 @@ impl Stream for TransactionFetcher { })) } Ok(Err(req_err)) => { - self.buffer_hashes_for_retry(requested_hashes); + self.buffer_hashes_for_retry(requested_hashes, &peer_id); Poll::Ready(Some(FetchEvent::FetchError { peer_id, error: req_err })) } Err(_) => { - self.buffer_hashes_for_retry(requested_hashes); + self.buffer_hashes_for_retry(requested_hashes, &peer_id); // request channel closed/dropped Poll::Ready(Some(FetchEvent::FetchError { peer_id, @@ -717,22 +853,46 @@ impl Stream for TransactionFetcher { impl Default for TransactionFetcher { fn default() -> Self { Self { - active_peers: LruMap::new(MAX_CONCURRENT_TX_REQUESTS), + active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS), inflight_requests: Default::default(), - buffered_hashes: LruCache::new( - NonZeroUsize::new(MAX_CAPACITY_BUFFERED_HASHES) + hashes_pending_fetch: LruCache::new( + NonZeroUsize::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH) .expect("buffered cache limit should be non-zero"), ), - unknown_hashes: LruMap::new_unlimited(), - eth68_meta: LruMap::new_unlimited(), + hashes_fetch_inflight_and_pending_fetch: LruMap::new_unlimited(), filter_valid_hashes: Default::default(), + info: TransactionFetcherInfo::default(), } } } +/// Metadata of a transaction hash that is yet to be fetched. +#[derive(Debug, Constructor)] +pub(super) struct TxFetchMetadata { + /// The number of times a request attempt has been made for the hash. + retries: u8, + /// Peers that have announced the hash, but to which a request attempt has not yet been made. + fallback_peers: LruCache, + /// Size metadata of the transaction if it has been seen in an eth68 announcement. + // todo: store all seen sizes as a `(size, peer_id)` tuple to catch peers that respond with + // another size tx than they announced. alt enter in request (won't catch peers announcing + // wrong size for requests assembled from hashes pending fetch if stored in request fut) + tx_encoded_length: Option, +} + +impl TxFetchMetadata { + pub fn fallback_peers_mut(&mut self) -> &mut LruCache { + &mut self.fallback_peers + } + + pub fn tx_encoded_len(&self) -> Option { + self.tx_encoded_length + } +} + /// Represents possible events from fetching transactions. #[derive(Debug)] -pub(super) enum FetchEvent { +pub(crate) enum FetchEvent { /// Triggered when transactions are successfully fetched. TransactionsFetched { /// The ID of the peer from which transactions were fetched. @@ -749,18 +909,19 @@ pub(super) enum FetchEvent { }, } -/// An inflight request for `PooledTransactions` from a peer +/// An inflight request for [`PooledTransactions`] from a peer pub(super) struct GetPooledTxRequest { peer_id: PeerId, /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: ValidTxHashes, + requested_hashes: RequestTxHashes, response: oneshot::Receiver>, } pub(super) struct GetPooledTxResponse { peer_id: PeerId, - /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: ValidTxHashes, + /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a + /// subset of requested hashes. + requested_hashes: RequestTxHashes, result: Result, RecvError>, } @@ -775,7 +936,7 @@ impl GetPooledTxRequestFut { #[inline] fn new( peer_id: PeerId, - requested_hashes: ValidTxHashes, + requested_hashes: RequestTxHashes, response: oneshot::Receiver>, ) -> Self { Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } @@ -801,62 +962,192 @@ impl Future for GetPooledTxRequestFut { } } +/// Tracks stats about the [`TransactionFetcher`]. +#[derive(Debug)] +pub struct TransactionFetcherInfo { + /// Currently active outgoing [`GetPooledTransactions`] requests. + pub(super) max_inflight_requests: usize, +} + +impl TransactionFetcherInfo { + pub fn new(max_inflight_transaction_requests: usize) -> Self { + Self { max_inflight_requests: max_inflight_transaction_requests } + } +} + +impl Default for TransactionFetcherInfo { + fn default() -> Self { + Self::new(DEFAULT_MAX_COUNT_INFLIGHT_REQUESTS_ON_FETCH_PENDING_HASHES) + } +} + #[cfg(test)] mod test { + use std::collections::HashSet; + + use reth_eth_wire::EthVersion; use reth_primitives::B256; - use crate::transactions::tests::default_cache; + use crate::transactions::tests::{default_cache, new_mock_session}; use super::*; #[test] - fn pack_eth68_request_surplus_hashes() { + fn pack_eth68_request() { reth_tracing::init_test_tracing(); let tx_fetcher = &mut TransactionFetcher::default(); - let peer_id = PeerId::new([1; 64]); - let eth68_hashes = [ B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32]), - B256::from_slice(&[6; 32]), ]; let eth68_hashes_sizes = [ - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE - 4, - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, // this one will not fit - 2, // this one will fit - 3, // but now this one won't - 2, /* this one will, no more - * txns - * will - * fit - * after this */ - 1, + DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST - 2, + DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_PACK_GET_POOLED_TRANSACTIONS_REQUEST, /* this one will + * not fit */ + 2, + 9, // this one won't + 2, ]; - // load unseen hashes in reverse order so index 0 in seen_eth68_hashes and - // seen_eth68_hashes_sizes is lru! + // possible included index combinations are + // (i) 0 and 2 + // (ii) 0 and 4 + // (iii) 1 + // (iv) 2, 3 and 4 + let possible_outcome_1 = + [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::>(); + let possible_outcome_2 = + [eth68_hashes[0], eth68_hashes[4]].into_iter().collect::>(); + let possible_outcome_3 = [eth68_hashes[1]].into_iter().collect::>(); + let possible_outcome_4 = + [eth68_hashes[2], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::>(); + + let possible_outcomes = + [possible_outcome_1, possible_outcome_2, possible_outcome_3, possible_outcome_4]; + + let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3); + let mut valid_announcement_data = ValidAnnouncementData::empty_eth68(); + for i in 0..eth68_hashes.len() { + valid_announcement_data.insert(eth68_hashes[i], Some((0, eth68_hashes_sizes[i]))); + } + let surplus_eth68_hashes = + tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data); - for i in (0..6).rev() { - tx_fetcher.unknown_hashes.insert(eth68_hashes[i], (0, default_cache())); - tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]); + let combo_surplus_hashes = surplus_eth68_hashes.into_iter().collect::>(); + for combo in possible_outcomes.clone() { + assert_ne!(combo, combo_surplus_hashes) } - let mut eth68_hashes_to_request = ValidTxHashes::new_eth68(eth68_hashes.clone().to_vec()); - let surplus_eth68_hashes = - tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id); + let combo_hashes_to_request = eth68_hashes_to_request.into_iter().collect::>(); + + let mut combo_match = false; + for combo in possible_outcomes { + if combo == combo_hashes_to_request { + combo_match = true; + } + } + + assert!(combo_match) + } + + #[tokio::test] + async fn test_on_fetch_pending_hashes() { + reth_tracing::init_test_tracing(); + + let tx_fetcher = &mut TransactionFetcher::default(); + + // RIG TEST + + // hashes that will be fetched because they are stored as pending fetch + let seen_hashes = [ + B256::from_slice(&[1; 32]), + B256::from_slice(&[2; 32]), + B256::from_slice(&[3; 32]), + B256::from_slice(&[4; 32]), + ]; + // + // txns 1-3 are small, all will fit in request. no metadata has been made available for + // hash 4, it has only been seen over eth66 conn, so average tx size will be assumed in + // filling request. + let seen_eth68_hashes_sizes = [120, 158, 116]; + + // peer that will fetch seen hashes because they are pending fetch + let peer_1 = PeerId::new([1; 64]); + // second peer, won't do anything in this test + let peer_2 = PeerId::new([2; 64]); + + // add seen hashes to peers seen transactions + // + // get handle for peer_1's session to receive request for pending hashes + let (mut peer_1_data, mut peer_1_mock_session_rx) = + new_mock_session(peer_1, EthVersion::Eth66); + for hash in &seen_hashes { + peer_1_data.seen_transactions.seen_in_announcement(*hash); + } + let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66); + for hash in &seen_hashes { + peer_2_data.seen_transactions.seen_in_announcement(*hash); + } + let mut peers = HashMap::new(); + peers.insert(peer_1, peer_1_data); + peers.insert(peer_2, peer_2_data); + + // insert peer_2 as fallback peer for seen_hashes + let mut backups = default_cache(); + backups.insert(peer_2); + // insert seen_hashes into tx fetcher + for i in 0..3 { + let meta = TxFetchMetadata::new(0, backups.clone(), Some(seen_eth68_hashes_sizes[i])); + tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[i], meta); + } + let meta = TxFetchMetadata::new(0, backups.clone(), None); + tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[3], meta); + // + // insert pending hash without peer_1 as fallback peer, only with peer_2 as fallback peer + let hash_other = B256::from_slice(&[5; 32]); + tx_fetcher + .hashes_fetch_inflight_and_pending_fetch + .insert(hash_other, TxFetchMetadata::new(0, backups, None)); + tx_fetcher.hashes_pending_fetch.insert(hash_other); + + // add peer_1 as lru fallback peer for seen hashes + for hash in &seen_hashes { + tx_fetcher + .hashes_fetch_inflight_and_pending_fetch + .get(hash) + .unwrap() + .fallback_peers_mut() + .insert(peer_1); + } + + // mark seen hashes as pending fetch + for hash in &seen_hashes { + tx_fetcher.hashes_pending_fetch.insert(*hash); + } + + // seen hashes and the random hash from peer_2 are pending fetch + assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 5); + + // TEST + + tx_fetcher.on_fetch_pending_hashes(&peers, |_| true, || ()); + + // mock session of peer_1 receives request + let req = peer_1_mock_session_rx + .recv() + .await + .expect("peer session should receive request with buffered hashes"); + let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() }; + let GetPooledTransactions(requested_hashes) = request; assert_eq!( - surplus_eth68_hashes.into_hashes(), - vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5]) - ); - assert_eq!( - eth68_hashes_to_request.into_hashes(), - vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4]) - ); + requested_hashes.into_iter().collect::>(), + seen_hashes.into_iter().collect::>() + ) } } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4abcc5d81a4b..6c4f6d7592b0 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -9,7 +9,7 @@ //! hand, space remains, hashes that the peer has previously announced are taken out of buffered //! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the //! peer's session, this marks the peer as active with respect to -//! `fetcher::MAX_CONCURRENT_TX_REQUESTS_PER_PEER`. +//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`. //! //! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes` //! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is @@ -37,8 +37,8 @@ use crate::{ use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ EthVersion, GetPooledTransactions, HandleAnnouncement, NewPooledTransactionHashes, - NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, - ValidTxHashes, + NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, + RequestTxHashes, Transactions, }; use reth_interfaces::{ p2p::error::{RequestError, RequestResult}, @@ -59,33 +59,28 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, num::NonZeroUsize, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, }; use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::{debug, trace}; +mod constants; mod fetcher; mod validation; -use fetcher::{FetchEvent, TransactionFetcher}; +use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; +pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; pub use validation::*; -/// Cache limit of transactions to keep track of for a single peer. -const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10; - -/// Soft limit for NewPooledTransactions -const SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET: usize = 4096; - -/// Soft limit for the message of full transactions in bytes. -const SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE: usize = 128 * 1024; - -/// Soft limit for the response size of a [`GetPooledTransactions`] message (128 KiB) in bytes. -/// Standard maximum response size is 2 MiB. See specs -/// -/// . -const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE: usize = 128 * 1024; +use self::constants::{ + tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE, + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, +}; /// The future for inserting a function into the pool pub type PoolImportFuture = Pin>> + Send + 'static>>; @@ -216,6 +211,8 @@ pub struct TransactionsManager { transactions_by_peers: HashMap>, /// Transactions that are currently imported into the `Pool` pool_imports: FuturesUnordered, + /// Stats on pending pool imports that help the node self-monitor. + pending_pool_imports_info: PendingPoolImportsInfo, /// All the connected peers. peers: HashMap, /// Send half for the command channel. @@ -228,9 +225,6 @@ pub struct TransactionsManager { transaction_events: UnboundedMeteredReceiver, /// TransactionsManager metrics metrics: TransactionsManagerMetrics, - /// Configures wether or not to handle hashes from an announcement that didn't fit in the - /// request. If set to `false`, hashes that don't fit will be dropped. - enable_tx_refetch: bool, } impl TransactionsManager { @@ -245,15 +239,27 @@ impl TransactionsManager { let network_events = network.event_listener(); let (command_tx, command_rx) = mpsc::unbounded_channel(); + let transaction_fetcher = TransactionFetcher::default(); + // install a listener for new pending transactions that are allowed to be propagated over // the network let pending = pool.pending_transactions_listener(); + let pending_pool_imports_info = + PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS); + + let metrics = TransactionsManagerMetrics::default(); + metrics + .capacity_inflight_requests + .increment(transaction_fetcher.info.max_inflight_requests as u64); + metrics + .capacity_pending_pool_imports + .increment(pending_pool_imports_info.max_pending_pool_imports as u64); Self { pool, network, network_events, - transaction_fetcher: Default::default(), + transaction_fetcher, transactions_by_peers: Default::default(), pool_imports: Default::default(), peers: Default::default(), @@ -264,8 +270,8 @@ impl TransactionsManager { from_network, NETWORK_POOL_TRANSACTIONS_SCOPE, ), - metrics: Default::default(), - enable_tx_refetch: false, + metrics, + pending_pool_imports_info, } } } @@ -287,10 +293,16 @@ where Pool: TransactionPool + 'static, { #[inline] - fn update_request_metrics(&self) { - self.metrics - .inflight_transaction_requests - .set(self.transaction_fetcher.inflight_requests.len() as f64); + fn update_fetch_metrics(&self) { + let tx_fetcher = &self.transaction_fetcher; + + self.metrics.inflight_transaction_requests.set(tx_fetcher.inflight_requests.len() as f64); + + let hashes_pending_fetch = tx_fetcher.hashes_pending_fetch.len() as f64; + let total_hashes = tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len() as f64; + + self.metrics.hashes_pending_fetch.set(hashes_pending_fetch); + self.metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch); } /// Request handler for an incoming request for transactions @@ -308,13 +320,14 @@ where let transactions = self.pool.get_pooled_transaction_elements( request.0, GetPooledTransactionLimit::ResponseSizeSoftLimit( - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_MESSAGE, + SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, ), ); // we sent a response at which point we assume that the peer is aware of the // transactions - peer.transactions.extend(transactions.iter().map(|tx| *tx.hash())); + peer.seen_transactions + .extend_seen_by_peer_and_in_pool(transactions.iter().map(|tx| *tx.hash())); let resp = PooledTransactions(transactions); let _ = response.send(Ok(resp)); @@ -382,7 +395,9 @@ where // transaction lists, before deciding whether or not to send full transactions to the // peer. for tx in to_propagate.iter() { - if peer.transactions.insert(tx.hash()) { + if !peer.seen_transactions.has_seen_transaction(&tx.hash()) { + peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); + hashes.push(tx); // Do not send full 4844 transaction hashes to peers. @@ -407,7 +422,7 @@ where // enforce tx soft limit per message for the (unlikely) event the number of // hashes exceeds it new_pooled_hashes.truncate( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET, + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, ); for hash in new_pooled_hashes.iter_hashes().copied() { @@ -468,7 +483,9 @@ where // Iterate through the transactions to propagate and fill the hashes and full transaction for tx in to_propagate { - if peer.transactions.insert(tx.hash()) { + if !peer.seen_transactions.has_seen_transaction(&tx.hash()) { + peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); + full_transactions.push(&tx); } } @@ -514,7 +531,8 @@ where let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); for tx in to_propagate { - if peer.transactions.insert(tx.hash()) { + if peer.seen_transactions.has_seen_transaction(&tx.hash()) { + peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); hashes.push(&tx); } } @@ -567,14 +585,7 @@ where return }; - - // keep track of the transactions the peer knows - let mut num_already_seen = 0; - for tx in msg.iter_hashes().copied() { - if !peer.transactions.insert(tx) { - num_already_seen += 1; - } - } + let client_version = peer.client_version.clone(); // 1. filter out known hashes // @@ -583,7 +594,25 @@ where // most hashes will be filtered out here since this the mempool protocol is a gossip // protocol, healthy peers will send many of the same hashes. // - self.pool.retain_unknown(&mut msg); + let already_known_by_pool = self.pool.retain_unknown(&mut msg); + + // keep track of the transactions the peer knows + let mut num_already_seen = 0; + if let Some(pools_intersection) = already_known_by_pool { + for tx in pools_intersection.into_hashes() { + if peer.seen_transactions.has_seen_transaction(&tx) { + num_already_seen += 1; + } + peer.seen_transactions.seen_by_peer_and_in_pool(tx); + } + } + for tx in msg.iter_hashes().copied() { + if peer.seen_transactions.has_seen_transaction(&tx) { + num_already_seen += 1; + } + peer.seen_transactions.seen_in_announcement(tx); + } + if msg.is_empty() { // nothing to request return @@ -593,7 +622,7 @@ where // // validates messages with respect to the given network, e.g. allowed tx types // - let mut hashes = match msg { + let mut valid_announcement_data = match msg { NewPooledTransactionHashes::Eth68(eth68_msg) => { // validate eth68 announcement data let (outcome, valid_data) = @@ -602,29 +631,8 @@ where if let FilterOutcome::ReportPeer = outcome { self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); } - let hashes = valid_data.into_iter().map(|(hash, metadata)| { - // cache eth68 metadata - if let Some((_ty, size)) = metadata { - // check if this peer is announcing a different size for an already seen - // hash - if let Some(previously_seen_size) = self.transaction_fetcher.eth68_meta.get(&hash) { - if size != *previously_seen_size { - // todo: store both sizes as a `(size, peer_id)` tuple to catch peers - // that respond with another size tx than they announced - debug!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - size=size, - previously_seen_size=previously_seen_size, - "peer announced a different size for tx, this is especially worrying if either size is very big..." - ); - } - } - self.transaction_fetcher.eth68_meta.insert(hash, size); - } - hash - }).collect::>(); - ValidTxHashes::new_eth68(hashes) + valid_data } NewPooledTransactionHashes::Eth66(eth66_msg) => { // validate eth66 announcement data @@ -635,12 +643,15 @@ where self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); } - let valid_hashes = valid_data.into_data().into_keys().collect::>(); - - ValidTxHashes::new_eth66(valid_hashes) + valid_data } }; + if valid_announcement_data.is_empty() { + // no valid announcement data + return + } + // 3. filter out already seen unknown hashes // // seen hashes are already in the tx fetcher, pending fetch. @@ -649,46 +660,61 @@ where // fetcher, hence they should be valid at this point. // self.transaction_fetcher.filter_unseen_and_pending_hashes( - &mut hashes, - peer_id, + &mut valid_announcement_data, + &peer_id, |peer_id| self.peers.contains_key(&peer_id), + &client_version, ); - if hashes.is_empty() { + if valid_announcement_data.is_empty() { // nothing to request return } trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes_len=hashes.len(), - hashes=?*hashes, - msg_version=%hashes.msg_version(), + hashes_len=valid_announcement_data.iter().count(), + hashes=?valid_announcement_data.keys().collect::>(), + msg_version=%valid_announcement_data.msg_version(), + client_version=%client_version, "received previously unseen and pending hashes in announcement from peer" ); // only send request for hashes to idle peer, otherwise buffer hashes storing peer as // fallback - if !self.transaction_fetcher.is_idle(peer_id) { + if !self.transaction_fetcher.is_idle(&peer_id) { + // load message version before announcement data is destructed in packing + let msg_version = valid_announcement_data.msg_version(); + let (hashes, _version) = valid_announcement_data.into_request_hashes(); + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?*hashes, - msg_version=%hashes.msg_version(), + msg_version=%msg_version, + client_version=%client_version, "buffering hashes announced by busy peer" ); self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id)); + return } + + // load message version before announcement data is destructed in packing + let msg_version = valid_announcement_data.msg_version(); // demand recommended soft limit on response, however the peer may enforce an arbitrary // limit on the response (2MB) - let surplus_hashes = self.transaction_fetcher.pack_hashes(&mut hashes, peer_id); + let mut hashes_to_request = RequestTxHashes::with_capacity(valid_announcement_data.len()); + let surplus_hashes = + self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data); + hashes_to_request.shrink_to_fit(); if !surplus_hashes.is_empty() { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), surplus_hashes=?*surplus_hashes, - msg_version=%surplus_hashes.msg_version(), + msg_version=%msg_version, + client_version=%client_version, "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" ); @@ -697,8 +723,9 @@ where trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), - hashes=?*hashes, - msg_version=%hashes.msg_version(), + hashes=?*hashes_to_request, + msg_version=%msg_version, + client_version=%client_version, "sending hashes in `GetPooledTransactions` request to peer's session" ); @@ -708,14 +735,17 @@ where let Some(peer) = self.peers.get_mut(&peer_id) else { return }; let metrics = &self.metrics; if let Some(failed_to_request_hashes) = - self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || { + self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer, || { metrics.egress_peer_channel_full.increment(1) }) { + let conn_eth_version = peer.version; + debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?*failed_to_request_hashes, - msg_version=%failed_to_request_hashes.msg_version(), + conn_eth_version=%conn_eth_version, + client_version=%client_version, "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" ); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); @@ -724,113 +754,11 @@ where if num_already_seen > 0 { self.metrics.messages_with_already_seen_hashes.increment(1); - trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen hashes"); + trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?client_version, "Peer sent already seen hashes"); self.report_already_seen(peer_id); } } - /// Tries to request hashes in buffer. - /// - /// If any idle fallback peer exists for any hash in the buffer, that hash is taken out of the - /// buffer and put into a request to that peer. Before sending, the request is filled with - /// additional hashes out of the buffer, for which the peer is listed as fallback, as long as - /// space remains in the respective transactions response. - fn request_buffered_hashes(&mut self) { - loop { - let mut hashes = vec![]; - let Some(peer_id) = self.pop_any_idle_peer(&mut hashes) else { return }; - - debug_assert!( - self.peers.contains_key(&peer_id), - "a dead peer has been returned as idle by `@pop_any_idle_peer`, broken invariant `@peers` and `@transaction_fetcher`, -`%peer_id`: {}, -`@peers`: {:?}, -`@transaction_fetcher`: {:?}", - peer_id, self.peers, self.transaction_fetcher - ); - - // fill the request with other buffered hashes that have been announced by the peer - let Some(peer) = self.peers.get(&peer_id) else { return }; - let Some(hash) = hashes.first() else { return }; - - let mut eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied(); - if let Some(ref mut size) = eth68_size { - self.transaction_fetcher.fill_eth68_request_for_peer(&mut hashes, peer_id, size); - } else { - self.transaction_fetcher.fill_eth66_request_for_peer(&mut hashes, peer_id); - } - - let msg_version = peer.version; - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hashes=?hashes, - msg_version=%msg_version, - "requesting buffered hashes from idle peer" - ); - - // request the buffered missing transactions - let metrics = &self.metrics; - if let Some(failed_to_request_hashes) = - self.transaction_fetcher.request_transactions_from_peer( - ValidTxHashes::new(hashes, msg_version), - peer, - || metrics.egress_peer_channel_full.increment(1), - ) - { - debug!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - failed_to_request_hashes=?failed_to_request_hashes, - msg_version=%msg_version, - "failed sending request to peer's session, buffering hashes" - ); - - self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); - return - } - } - } - - /// Returns any idle peer for any buffered unknown hash, and writes that hash to the request's - /// hashes buffer that is passed as parameter. - /// - /// Loops through the fallback peers of each buffered hashes, until an idle fallback peer is - /// found. As a side effect, dead fallback peers are filtered out for visited hashes. - fn pop_any_idle_peer(&mut self, hashes: &mut Vec) -> Option { - let mut ended_sessions = vec![]; - let mut buffered_hashes_iter = self.transaction_fetcher.buffered_hashes.iter(); - let peers = &self.peers; - - let idle_peer = loop { - let Some(&hash) = buffered_hashes_iter.next() else { break None }; - - let idle_peer = - self.transaction_fetcher.get_idle_peer_for(hash, &mut ended_sessions, |peer_id| { - peers.contains_key(&peer_id) - }); - for peer_id in ended_sessions.drain(..) { - let (_, peers) = self.transaction_fetcher.unknown_hashes.peek_mut(&hash)?; - _ = peers.remove(&peer_id); - } - if idle_peer.is_some() { - hashes.push(hash); - break idle_peer - } - }; - - let peer_id = &idle_peer?; - let hash = hashes.first()?; - - let (_, peers) = self.transaction_fetcher.unknown_hashes.get(hash)?; - // pop peer from fallback peers - _ = peers.remove(peer_id); - // pop hash that is loaded in request buffer from buffered hashes - drop(buffered_hashes_iter); - _ = self.transaction_fetcher.buffered_hashes.remove(hash); - - idle_peer - } - /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { @@ -890,7 +818,12 @@ where let hashes = self .peers .get(&peer_id) - .map(|peer| peer.transactions.iter().copied().collect::>()) + .map(|peer| { + peer.seen_transactions + .iter_transaction_hashes() + .copied() + .collect::>() + }) .unwrap_or_default(); res.insert(peer_id, hashes); } @@ -914,17 +847,7 @@ where peer_id, client_version, messages, version, .. } => { // insert a new peer into the peerset - self.peers.insert( - peer_id, - Peer { - transactions: LruCache::new( - NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), - ), - request_tx: messages, - version, - client_version, - }, - ); + self.peers.insert(peer_id, Peer::new(messages, version, client_version)); // Send a `NewPooledTransactionHashes` to the peer with up to // `NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT` transactions in the @@ -938,7 +861,7 @@ where let mut msg_builder = PooledTransactionsHashesBuilder::new(version); let pooled_txs = self.pool.pooled_transactions_max( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_MEMPOOL_PACKET, + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, ); if pooled_txs.is_empty() { // do not send a message if there are no transactions in the pool @@ -946,7 +869,7 @@ where } for pooled_tx in pooled_txs.into_iter() { - peer.transactions.insert(*pooled_tx.hash()); + peer.seen_transactions.seen_by_peer_and_in_pool(*pooled_tx.hash()); msg_builder.push_pooled(pooled_tx); } @@ -993,8 +916,14 @@ where // track that the peer knows this transaction, but only if this is a new broadcast. // If we received the transactions as the response to our GetPooledTransactions // requests (based on received `NewPooledTransactionHashes`) then we already - // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] - if source.is_broadcast() && !peer.transactions.insert(*tx.hash()) { + // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] as `peer. + // seen_transactions.transactions_received_as_hash`. In that case, we don't move + // hashes from `peer.seen_transactions.transactions_received_as_hash` to + // `peer.seen_transactions.transactions_received_in_full_or_sent` here. The + // division of the `seen_transactions` list, just serves as a hint for tx fetcher + // of which hashes are missing. It's good enough without reallocating hashes. + + if source.is_broadcast() && peer.seen_transactions.has_seen_transaction(tx.hash()) { num_already_seen += 1; } @@ -1016,15 +945,25 @@ where // import new transactions as a batch to minimize lock contention on the underlying pool if !new_txs.is_empty() { let pool = self.pool.clone(); + // update metrics let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone(); - metric_pending_pool_imports.increment(new_txs.len() as f64); + // update self-monitoring info + self.pending_pool_imports_info + .pending_pool_imports + .fetch_add(new_txs.len(), Ordering::Relaxed); + let tx_manager_info_pending_pool_imports = + self.pending_pool_imports_info.pending_pool_imports.clone(); + let import = Box::pin(async move { let added = new_txs.len(); let res = pool.add_external_transactions(new_txs).await; + // update metrics metric_pending_pool_imports.decrement(added as f64); + // update self-monitoring info + tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed); res }); @@ -1076,7 +1015,8 @@ where self.transactions_by_peers.remove(&hash); } - /// Penalize the peers that sent the bad transaction + /// Penalize the peers that sent the bad transaction and cache it to avoid fetching or + /// importing it again. fn on_bad_import(&mut self, hash: TxHash) { if let Some(peers) = self.transactions_by_peers.remove(&hash) { for peer_id in peers { @@ -1084,6 +1024,14 @@ where } } } + + /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns + /// `false` if [`TransactionsManager`] is operating close to full capacity. + fn has_capacity_for_fetching_pending_hashes(&self) -> bool { + self.pending_pool_imports_info + .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) && + self.transaction_fetcher.has_capacity_for_fetching_pending_hashes() + } } /// An endless future. Preemption ensure that future is non-blocking, nonetheless. See @@ -1112,6 +1060,23 @@ where some_ready = true; } + if this.has_capacity_for_fetching_pending_hashes() { + // try drain buffered transactions. + let info = &this.pending_pool_imports_info; + let max_pending_pool_imports = info.max_pending_pool_imports; + let has_capacity_wrt_pending_pool_imports = + |divisor| info.has_capacity(max_pending_pool_imports / divisor); + + let metrics = &this.metrics; + let metrics_increment_egress_peer_channel_full = + || metrics.egress_peer_channel_full.increment(1); + + this.transaction_fetcher.on_fetch_pending_hashes( + &this.peers, + has_capacity_wrt_pending_pool_imports, + metrics_increment_egress_peer_channel_full, + ); + } // drain commands if let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { this.on_command(cmd); @@ -1124,7 +1089,7 @@ where some_ready = true; } - this.update_request_metrics(); + this.update_fetch_metrics(); // drain fetching transaction events if let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) { @@ -1144,11 +1109,7 @@ where some_ready = true; } - if this.enable_tx_refetch { - // try drain buffered transactions - this.request_buffered_hashes(); - } - this.update_request_metrics(); + this.update_fetch_metrics(); // Advance all imports if let Poll::Ready(Some(batch_import_res)) = this.pool_imports.poll_next_unpin(cx) { @@ -1226,7 +1187,7 @@ impl PropagateTransaction { } /// Helper type for constructing the full transaction message that enforces the -/// [`SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE`]. +/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`]. #[derive(Default)] struct FullTransactionsBuilder { total_size: usize, @@ -1240,10 +1201,11 @@ impl FullTransactionsBuilder { /// maximum target byte size. The limit is soft, meaning if one single transaction goes over /// the limit, it will be broadcasted in its own [`Transactions`] message. The same pattern is /// followed in filling a [`GetPooledTransactions`] request in - /// [`TransactionFetcher::fill_eth68_request_for_peer`]. + /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`]. fn push(&mut self, transaction: &PropagateTransaction) { let new_size = self.total_size + transaction.size; - if new_size > SOFT_LIMIT_BYTE_SIZE_FULL_TRANSACTIONS_MEMPOOL_MESSAGE && self.total_size > 0 + if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE && + self.total_size > 0 { return } @@ -1331,11 +1293,72 @@ impl TransactionSource { } } +/// Tracks transactions a peer has seen. +#[derive(Debug)] +struct TransactionsSeenByPeer { + /// Keeps track of transactions that we know the peer has seen because they were announced by + /// the peer. It's possible that these transactions are pending fetch. + transactions_received_as_hash: LruCache, + /// Keeps track of transactions that we know the peer has seen because they were received in + /// full from the peer or sent to the peer. + transactions_received_in_full_or_sent: LruCache, +} + +impl TransactionsSeenByPeer { + /// Returns `true` if peer has seen transaction. + fn has_seen_transaction(&self, hash: &TxHash) -> bool { + self.transactions_received_in_full_or_sent.contains(hash) || + self.transactions_received_as_hash.contains(hash) + } + + /// Inserts a transaction hash that has been seen in an announcement. + fn seen_in_announcement(&mut self, hash: TxHash) { + _ = self.transactions_received_as_hash.insert(hash); + } + + /// Inserts a hash of a transaction that has either been sent to the peer, or has been + /// received in full from the peer over broadcast. + fn seen_by_peer_and_in_pool(&mut self, hash: TxHash) { + _ = self.transactions_received_in_full_or_sent.insert(hash); + } + + /// Inserts a list of transactions that have either been sent to the peer, or have been + /// received in full from the peer over broadcast. + fn extend_seen_by_peer_and_in_pool(&mut self, hashes: impl IntoIterator) { + self.transactions_received_in_full_or_sent.extend(hashes) + } + + /// Returns an iterator over all transactions that the peer has seen. + fn iter_transaction_hashes(&self) -> impl Iterator { + self.transactions_received_as_hash + .iter() + .chain(self.transactions_received_in_full_or_sent.iter()) + } + + /// Returns an iterator over all transaction hashes that the peer has sent in an announcement. + fn maybe_pending_transaction_hashes(&self) -> &LruCache { + &self.transactions_received_as_hash + } +} + +impl Default for TransactionsSeenByPeer { + fn default() -> Self { + Self { + transactions_received_as_hash: LruCache::new( + NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL).unwrap(), + ), + transactions_received_in_full_or_sent: LruCache::new( + NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL).unwrap(), + ), + } + } +} + /// Tracks a single peer #[derive(Debug)] struct Peer { /// Keeps track of transactions that we know the peer has seen. - transactions: LruCache, + seen_transactions: TransactionsSeenByPeer, /// A communication channel directly to the peer's session task. request_tx: PeerRequestSender, /// negotiated version of the session. @@ -1344,6 +1367,17 @@ struct Peer { client_version: Arc, } +impl Peer { + fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc) -> Self { + Self { + seen_transactions: TransactionsSeenByPeer::default(), + request_tx, + version, + client_version, + } + } +} + /// Commands to send to the [`TransactionsManager`] #[derive(Debug)] enum TransactionsCommand { @@ -1397,12 +1431,33 @@ pub enum NetworkTransactionEvent { }, } +/// Tracks stats about the [`TransactionsManager`]. +#[derive(Debug)] +struct PendingPoolImportsInfo { + /// Number of transactions about to be imported into the pool. + pending_pool_imports: Arc, + /// Max number of transactions about to be imported into the pool. + max_pending_pool_imports: usize, +} + +impl PendingPoolImportsInfo { + pub fn new(max_pending_pool_imports: usize) -> Self { + Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports } + } + + /// Returns `true` if the number of pool imports is under a given tolerated max. + pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool { + self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports + } +} + #[cfg(test)] mod tests { + use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS; + use super::*; use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager}; use alloy_rlp::Decodable; - use fetcher::MAX_ALTERNATIVE_PEERS_PER_TX; use futures::FutureExt; use reth_interfaces::sync::{NetworkSyncUpdater, SyncState}; use reth_network_api::NetworkInfo; @@ -1411,6 +1466,7 @@ mod tests { use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; use secp256k1::SecretKey; use std::{future::poll_fn, hash}; + use tests::fetcher::TxFetchMetadata; async fn new_tx_manager() -> TransactionsManager { let secret_key = SecretKey::new(&mut rand::thread_rng()); @@ -1435,24 +1491,19 @@ mod tests { } pub(super) fn default_cache() -> LruCache { - let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).unwrap(); + let limit = NonZeroUsize::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS.into()).unwrap(); LruCache::new(limit) } // Returns (peer, channel-to-send-get-pooled-tx-response-on). - fn new_mock_session( + pub(super) fn new_mock_session( peer_id: PeerId, version: EthVersion, ) -> (Peer, mpsc::Receiver) { let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1); ( - Peer { - transactions: default_cache(), - request_tx: PeerRequestSender::new(peer_id, to_mock_session_tx), - version, - client_version: Arc::from(""), - }, + Peer::new(PeerRequestSender::new(peer_id, to_mock_session_tx), version, Arc::from("")), to_mock_session_rx, ) } @@ -1806,7 +1857,7 @@ mod tests { } #[tokio::test] - async fn max_retries_tx_request() { + async fn test_max_retries_tx_request() { reth_tracing::init_test_tracing(); let mut tx_manager = new_tx_manager().await; @@ -1817,7 +1868,11 @@ mod tests { let eth_version = EthVersion::Eth66; let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])]; - let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version); + let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version); + // mark hashes as seen by peer so it can fish them out from the cache for hashes pending + // fetch + peer_1.seen_transactions.seen_in_announcement(seen_hashes[0]); + peer_1.seen_transactions.seen_in_announcement(seen_hashes[1]); tx_manager.peers.insert(peer_id_1, peer_1); // hashes are seen and currently not inflight, with one fallback peer, and are buffered @@ -1825,22 +1880,26 @@ mod tests { let retries = 1; let mut backups = default_cache(); backups.insert(peer_id_1); - tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups.clone())); - tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups)); - tx_fetcher.buffered_hashes.insert(seen_hashes[1]); - tx_fetcher.buffered_hashes.insert(seen_hashes[0]); + tx_fetcher + .hashes_fetch_inflight_and_pending_fetch + .insert(seen_hashes[1], TxFetchMetadata::new(retries, backups.clone(), None)); + tx_fetcher + .hashes_fetch_inflight_and_pending_fetch + .insert(seen_hashes[0], TxFetchMetadata::new(retries, backups, None)); + tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]); + tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]); // peer_1 is idle - assert!(tx_fetcher.is_idle(peer_id_1)); + assert!(tx_fetcher.is_idle(&peer_id_1)); // sends request for buffered hashes to peer_1 - tx_manager.request_buffered_hashes(); + tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true, || ()); let tx_fetcher = &mut tx_manager.transaction_fetcher; - assert!(tx_fetcher.buffered_hashes.is_empty()); + assert!(tx_fetcher.hashes_pending_fetch.is_empty()); // as long as request is in inflight peer_1 is not idle - assert!(!tx_fetcher.is_idle(peer_id_1)); + assert!(!tx_fetcher.is_idle(&peer_id_1)); // mock session of peer_1 receives request let req = to_mock_session_rx @@ -1861,9 +1920,9 @@ mod tests { }; // request has resolved, peer_1 is idle again - assert!(tx_fetcher.is_idle(peer_id)); + assert!(tx_fetcher.is_idle(&peer_id)); // failing peer_1's request buffers requested hashes for retry - assert_eq!(tx_fetcher.buffered_hashes.len(), 2); + assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2); let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version); tx_manager.peers.insert(peer_id_2, peer_2); @@ -1876,9 +1935,9 @@ mod tests { let tx_fetcher = &mut tx_manager.transaction_fetcher; // since hashes are already seen, no changes to length of unknown hashes - assert_eq!(tx_fetcher.unknown_hashes.len(), 2); + assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2); // but hashes are taken out of buffer and packed into request to peer_2 - assert!(tx_fetcher.buffered_hashes.is_empty()); + assert!(tx_fetcher.hashes_pending_fetch.is_empty()); // mock session of peer_2 receives request let req = to_mock_session_rx @@ -1893,97 +1952,8 @@ mod tests { .expect("should send peer_2 response to tx manager"); let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() }; - // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached however this time won't be - // buffered for retry - assert!(tx_fetcher.buffered_hashes.is_empty()); + // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered + // for retry + assert!(tx_fetcher.hashes_pending_fetch.is_empty()); } - - /*#[tokio::test] - async fn fill_eth68_request_for_peer() { - reth_tracing::init_test_tracing(); - - let mut tx_manager = new_tx_manager().await; - let tx_fetcher = &mut tx_manager.transaction_fetcher; - - let peer_id = PeerId::new([1; 64]); - let eth_version = EthVersion::Eth68; - let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])]; - let unseen_eth68_hashes_sizes = [ - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE / 4 - 1, - POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE / 4 - 5, - ]; - // hashes and sizes to buffer in reverse order so that seen_eth68_hashes[0] and - // seen_eth68_hashes_sizes[0] are lru - let seen_eth68_hashes = - [B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32])]; - let seen_eth68_hashes_sizes = [ - 5, - 3, // the second hash should be filled into the request because there is space for it - 4, // then there is no space for the last anymore - ]; - - // insert peer in tx manager - let (peer, _to_mock_session_rx) = new_mock_session(peer_id, eth_version); - tx_manager.peers.insert(peer_id, peer); - - // hashes are seen and currently not inflight, with one fallback peer, and are buffered - // for first try to fetch. - let mut backups = default_cache(); - backups.insert(peer_id); - - // load in reverse order so index 0 in seen_eth68_hashes and seen_eth68_hashes_sizes is - // lru! - - for i in (0..3).rev() { - tx_fetcher.unknown_hashes.insert(seen_eth68_hashes[i], (0, backups.clone())); - tx_fetcher.eth68_meta.insert(seen_eth68_hashes[i], seen_eth68_hashes_sizes[i]); - tx_fetcher.buffered_hashes.insert(seen_eth68_hashes[i]); - } - - // insert buffered hash for some other peer too, to verify response size accumulation and - // hash selection for peer from buffered hashes - let peer_id_other = PeerId::new([2; 64]); - let hash_other = B256::from_slice(&[6; 32]); - let mut backups = default_cache(); - backups.insert(peer_id_other); - tx_fetcher.unknown_hashes.insert(hash_other, (0, backups)); - tx_fetcher - .eth68_meta - .insert(hash_other, POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE - 2); // a big tx - tx_fetcher.buffered_hashes.insert(hash_other); - - let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version); - tx_manager.peers.insert(peer_id, peer); - - // peer announces previously unseen hashes - let msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 { - hashes: unseen_eth68_hashes.to_vec(), - sizes: unseen_eth68_hashes_sizes.to_vec(), - types: [0; 2].to_vec(), - }); - tx_manager.request_buffered_hashes(); - - let tx_fetcher = &mut tx_manager.transaction_fetcher; - - // since hashes are unseen, length of unknown hashes increases - assert_eq!(tx_fetcher.unknown_hashes.len(), 6); - // seen_eth68_hashes[1] should be taken out of buffer and packed into request - assert_eq!(tx_fetcher.buffered_hashes.len(), 3); - assert!(tx_fetcher.buffered_hashes.contains(&seen_eth68_hashes[0])); - - // mock session of peer receives request - let req = to_mock_session_rx - .recv() - .await - .expect("peer session should receive request with buffered hashes"); - let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() }; - let GetPooledTransactions(mut hashes) = request; - - let mut expected_request = unseen_eth68_hashes.to_vec(); - expected_request.push(seen_eth68_hashes[1]); - - hashes.sort(); - - assert_eq!(hashes, expected_request); - }*/ } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index c1a607984c5b..a57709f13608 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -455,7 +455,7 @@ where self.pool.remove_transactions(hashes) } - fn retain_unknown(&self, announcement: &mut A) + fn retain_unknown(&self, announcement: &mut A) -> Option where A: HandleAnnouncement, { diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index ad6a1843fe36..98dfd24927ca 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -174,10 +174,11 @@ impl TransactionPool for NoopTransactionPool { vec![] } - fn retain_unknown(&self, _announcement: &mut A) + fn retain_unknown(&self, _announcement: &mut A) -> Option where A: HandleAnnouncement, { + None } fn get(&self, _tx_hash: &TxHash) -> Option>> { diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index b7c733a4dd51..58ab12a753b5 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -669,16 +669,16 @@ where removed } - /// Removes all transactions that are present in the pool. - pub(crate) fn retain_unknown(&self, announcement: &mut A) + /// Removes and returns all transactions that are present in the pool. + pub(crate) fn retain_unknown(&self, announcement: &mut A) -> Option where A: HandleAnnouncement, { if announcement.is_empty() { - return + return None } let pool = self.get_pool_data(); - announcement.retain_by_hash(|tx| !pool.contains(&tx)) + Some(announcement.retain_by_hash(|tx| !pool.contains(tx))) } /// Returns the transaction by hash. diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 092cb53b7d02..7bd926f0f760 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -279,10 +279,10 @@ pub trait TransactionPool: Send + Sync + Clone { /// Retains only those hashes that are unknown to the pool. /// In other words, removes all transactions from the given set that are currently present in - /// the pool. + /// the pool. Returns hashes already known to the pool. /// /// Consumer: P2P - fn retain_unknown(&self, announcement: &mut A) + fn retain_unknown(&self, announcement: &mut A) -> Option where A: HandleAnnouncement;