diff --git a/Cargo.lock b/Cargo.lock index 62aebd6b02b9..c37038149edc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1449,7 +1449,7 @@ version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ - "crossterm 0.27.0", + "crossterm", "strum", "strum_macros", "unicode-width", @@ -1664,22 +1664,6 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" -[[package]] -name = "crossterm" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" -dependencies = [ - "bitflags 1.3.2", - "crossterm_winapi", - "libc", - "mio", - "parking_lot 0.12.1", - "signal-hook", - "signal-hook-mio", - "winapi", -] - [[package]] name = "crossterm" version = "0.27.0" @@ -3696,6 +3680,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" + [[package]] name = "infer" version = "0.2.3" @@ -5517,6 +5507,25 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "ratatui" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5659e52e4ba6e07b2dad9f1158f578ef84a73762625ddb51536019f34d180eb" +dependencies = [ + "bitflags 2.4.1", + "cassowary", + "crossterm", + "indoc", + "itertools 0.12.0", + "lru 0.12.1", + "paste", + "stability", + "strum", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "raw-cpuid" version = "10.7.0" @@ -5690,7 +5699,7 @@ dependencies = [ "comfy-table", "confy", "const-str", - "crossterm 0.27.0", + "crossterm", "dirs-next", "eyre", "fdlimit", @@ -5712,6 +5721,7 @@ dependencies = [ "procfs", "proptest", "rand 0.8.5", + "ratatui", "rayon", "reth-auto-seal-consensus", "reth-basic-payload-builder", @@ -5760,7 +5770,6 @@ dependencies = [ "tokio", "toml 0.8.8", "tracing", - "tui", "vergen", ] @@ -6273,6 +6282,7 @@ dependencies = [ "fnv", "futures", "humantime-serde", + "itertools 0.12.0", "linked-hash-map", "linked_hash_set", "metrics", @@ -7804,6 +7814,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" +[[package]] +name = "stability" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd1b177894da2a2d9120208c3386066af06a488255caabc5de8ddca22dbc3ce" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -8655,19 +8675,6 @@ dependencies = [ "termcolor", ] -[[package]] -name = "tui" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccdd26cbd674007e649a272da4475fb666d3aa0ad0531da7136db6fab0e5bad1" -dependencies = [ - "bitflags 1.3.2", - "cassowary", - "crossterm 0.25.0", - "unicode-segmentation", - "unicode-width", -] - [[package]] name = "tungstenite" version = "0.20.1" diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index b74be8eedd01..302e1e3a7e3a 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -90,7 +90,7 @@ rand.workspace = true # tui comfy-table = "7.0" crossterm = "0.27.0" -tui = "0.19.0" +ratatui = "0.25.0" human_bytes = "0.4.1" # async diff --git a/bin/reth/src/commands/db/tui.rs b/bin/reth/src/commands/db/tui.rs index c7b882faecbd..85fbeb079bad 100644 --- a/bin/reth/src/commands/db/tui.rs +++ b/bin/reth/src/commands/db/tui.rs @@ -3,6 +3,13 @@ use crossterm::{ execute, terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; +use ratatui::{ + backend::{Backend, CrosstermBackend}, + layout::{Alignment, Constraint, Direction, Layout}, + style::{Color, Modifier, Style}, + widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap}, + Frame, Terminal, +}; use reth_db::{ table::{Table, TableRow}, RawValue, @@ -12,13 +19,6 @@ use std::{ time::{Duration, Instant}, }; use tracing::error; -use tui::{ - backend::{Backend, CrosstermBackend}, - layout::{Alignment, Constraint, Corner, Direction, Layout}, - style::{Color, Modifier, Style}, - widgets::{Block, Borders, List, ListItem, ListState, Paragraph, Wrap}, - Frame, Terminal, -}; /// Available keybindings for the [DbListTUI] static CMDS: [(&str, &str); 6] = [ @@ -356,7 +356,7 @@ where } /// Render the UI -fn ui(f: &mut Frame<'_, B>, app: &mut DbListTUI) +fn ui(f: &mut Frame<'_>, app: &mut DbListTUI) where F: FnMut(usize, usize) -> Vec>, { @@ -392,8 +392,7 @@ where ))) .style(Style::default().fg(Color::White)) .highlight_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::ITALIC)) - .highlight_symbol("➜ ") - .start_corner(Corner::TopLeft); + .highlight_symbol("➜ "); f.render_stateful_widget(key_list, inner_chunks[0], &mut app.list_state); let value_display = Paragraph::new( diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 4354917be6ba..fb9e482841c4 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -144,6 +144,14 @@ impl NewPooledTransactionHashes { } } + /// Returns a mutable reference to transaction hashes. + pub fn hashes_mut(&mut self) -> &mut Vec { + match self { + NewPooledTransactionHashes::Eth66(msg) => &mut msg.0, + NewPooledTransactionHashes::Eth68(msg) => &mut msg.hashes, + } + } + /// Consumes the type and returns all hashes pub fn into_hashes(self) -> Vec { match self { @@ -188,6 +196,15 @@ impl NewPooledTransactionHashes { NewPooledTransactionHashes::Eth68(msg) => msg.hashes.len(), } } + + /// Returns an iterator over tx hashes zipped with corresponding eth68 metadata if this is + /// an eth68 message. + pub fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> { + match self { + NewPooledTransactionHashes::Eth66(_) => None, + NewPooledTransactionHashes::Eth68(msg) => Some(msg), + } + } } impl From for EthMessage { @@ -265,6 +282,13 @@ pub struct NewPooledTransactionHashes68 { pub hashes: Vec, } +impl NewPooledTransactionHashes68 { + /// Returns an iterator over tx hashes zipped with corresponding metadata. + pub fn metadata_iter(&self) -> impl Iterator { + self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied())) + } +} + impl Encodable for NewPooledTransactionHashes68 { fn encode(&self, out: &mut dyn bytes::BufMut) { #[derive(RlpEncodable)] diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 3812c7350e44..35704f58054a 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -65,6 +65,7 @@ rand.workspace = true secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] } derive_more.workspace = true schnellru.workspace = true +itertools.workspace = true enr = { workspace = true, features = ["rust-secp256k1"], optional = true } tempfile = { workspace = true, optional = true } diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 520c125168ae..4ccf48e601d8 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -53,13 +53,13 @@ impl LruCache { /// Remove the least recently used entry and return it. /// - /// If the `LruCache` is empty this will return None. + /// If the `LruCache` is empty or if the eviction feedback is + /// configured, this will return None. #[inline] fn remove_lru(&mut self) -> Option { self.inner.pop_front() } - #[allow(dead_code)] /// Expels the given value. Returns true if the value existed. pub fn remove(&mut self, value: &T) -> bool { self.inner.remove(value) @@ -80,14 +80,12 @@ impl LruCache { } /// Returns number of elements currently in cache. - #[cfg(test)] #[allow(dead_code)] pub fn len(&self) -> usize { self.inner.len() } /// Returns `true` if there are currently no elements in the cache. - #[cfg(test)] #[allow(dead_code)] pub fn is_empty(&self) -> bool { self.inner.is_empty() @@ -136,7 +134,6 @@ where K: Hash + PartialEq, { /// Returns a new cache with default limiter and hash builder. - #[allow(dead_code)] pub fn new(max_length: u32) -> Self { LruMap(schnellru::LruMap::new(ByLength::new(max_length))) } @@ -147,7 +144,6 @@ where K: Hash + PartialEq, { /// Returns a new cache with [`Unlimited`] limiter and default hash builder. - #[allow(dead_code)] pub fn new_unlimited() -> Self { LruMap(schnellru::LruMap::new(Unlimited)) } diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 58c7ad8a1675..3bf677b01a9d 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -160,7 +160,7 @@ impl ActiveSession { // request was already timed out internally self.update_request_timeout(req.timestamp, Instant::now()); } - }; + } } else { // we received a response to a request we never sent self.on_bad_message(); diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 3a9fc5a4f939..6338b6a37f5a 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -21,6 +21,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; + use tracing::trace; #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs new file mode 100644 index 000000000000..587774f1cd22 --- /dev/null +++ b/crates/net/network/src/transactions/fetcher.rs @@ -0,0 +1,711 @@ +use crate::{ + cache::{LruCache, LruMap}, + message::PeerRequest, +}; +use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; +use itertools::Itertools; +use pin_project::pin_project; +use reth_eth_wire::GetPooledTransactions; +use reth_interfaces::p2p::error::{RequestError, RequestResult}; +use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; +use schnellru::{ByLength, Unlimited}; +use std::{ + num::NonZeroUsize, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; +use tracing::{debug, trace}; + +use super::{Peer, PooledTransactions, MAX_FULL_TRANSACTIONS_PACKET_SIZE}; + +/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer. +pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1; + +/// How many peers we keep track of for each missing transaction. +pub(super) const MAX_ALTERNATIVE_PEERS_PER_TX: u8 = + MAX_REQUEST_RETRIES_PER_TX_HASH + MARGINAL_FALLBACK_PEERS_PER_TX; + +/// Marginal on fallback peers. If all fallback peers are idle, at most +/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`] of them can ever be needed. +const MARGINAL_FALLBACK_PEERS_PER_TX: u8 = 1; + +/// Maximum request retires per [`TxHash`]. Note, this is reset should the [`TxHash`] re-appear in +/// an announcement after it has been ejected from the hash buffer. +const MAX_REQUEST_RETRIES_PER_TX_HASH: u8 = 2; + +/// Maximum concurrent [`GetPooledTxRequest`]s. +const MAX_CONCURRENT_TX_REQUESTS: u32 = 10000; + +/// Cache limit of transactions waiting for idle peer to be fetched. +const MAX_CAPACITY_BUFFERED_HASHES: usize = 100 * GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES; + +/// Recommended soft limit for the number of hashes in a GetPooledTransactions message (8kb) +/// +/// +const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256; + +/// The type responsible for fetching missing transactions from peers. +/// +/// This will keep track of unique transaction hashes that are currently being fetched and submits +/// new requests on announced hashes. +#[derive(Debug)] +#[pin_project] +pub(super) struct TransactionFetcher { + /// All peers to which a request for pooled transactions is currently active. Maps 1-1 to + /// `inflight_requests`. + pub(super) active_peers: LruMap, + /// All currently active requests for pooled transactions. + #[pin] + pub(super) inflight_requests: FuturesUnordered, + /// Hashes that are awaiting fetch from an idle peer. + pub(super) buffered_hashes: LruCache, + /// Tracks all hashes that are currently being fetched or are buffered, mapping them to + /// request retries and last recently seen fallback peers (max one request try for any peer). + pub(super) unknown_hashes: LruMap), Unlimited>, + /// Size metadata for unknown eth68 hashes. + pub(super) eth68_meta: LruMap, +} + +// === impl TransactionFetcher === + +impl TransactionFetcher { + /// Removes the specified hashes from inflight tracking. + #[inline] + fn remove_from_unknown_hashes(&mut self, hashes: I) + where + I: IntoIterator, + { + for hash in hashes { + self.unknown_hashes.remove(&hash); + self.eth68_meta.remove(&hash); + } + } + + /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`]. + fn update_peer_activity(&mut self, resp: &GetPooledTxResponse) { + let GetPooledTxResponse { peer_id, .. } = resp; + + debug_assert!( + self.active_peers.get(peer_id).is_some(), + "broken invariant `active-peers` and `inflight-requests`" + ); + + let remove = || -> bool { + if let Some(inflight_count) = self.active_peers.get(peer_id) { + if *inflight_count <= 1 { + return true + } + *inflight_count -= 1; + } + false + }(); + + if remove { + self.active_peers.remove(peer_id); + } + } + + /// Returns `true` if peer is idle. + pub(super) fn is_idle(&self, peer_id: PeerId) -> bool { + let Some(inflight_count) = self.active_peers.peek(&peer_id) else { return true }; + if *inflight_count < MAX_CONCURRENT_TX_REQUESTS_PER_PEER { + return true + } + false + } + + /// Returns any idle peer for the given hash. Writes peer IDs of any ended sessions to buffer + /// passed as parameter. + pub(super) fn get_idle_peer_for( + &self, + hash: TxHash, + ended_sessions_buf: &mut Vec, + is_session_active: impl Fn(PeerId) -> bool, + ) -> Option { + let (_, peers) = self.unknown_hashes.peek(&hash)?; + + for &peer_id in peers.iter() { + if self.is_idle(peer_id) { + if is_session_active(peer_id) { + return Some(peer_id) + } else { + ended_sessions_buf.push(peer_id); + } + } + } + + None + } + + /// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes. + pub(super) fn pack_hashes(&mut self, hashes: &mut Vec, peer_id: PeerId) -> Vec { + let Some(hash) = hashes.first() else { return vec![] }; + + if self.eth68_meta.get(hash).is_some() { + return self.pack_hashes_eth68(hashes, peer_id) + } + self.pack_hashes_eth66(hashes, peer_id) + } + + /// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 66. + /// If necessary, takes hashes from buffer for which peer is listed as fallback peer. + /// + /// Returns left over hashes. + pub(super) fn pack_hashes_eth66( + &mut self, + hashes: &mut Vec, + peer_id: PeerId, + ) -> Vec { + if hashes.len() < GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { + self.fill_request_for_peer(hashes, peer_id, None); + return vec![] + } + hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES) + } + + /// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68 + /// request, based on the size of the transaction and the accumulated size of the + /// corresponding `PooledTransactions` response. + /// + /// Returns `true` if hash is included in request. If there is still space in the respective + /// response but not enough for the transaction of given hash, `false` is returned. + fn include_eth68_hash(&self, acc_size_response: &mut usize, eth68_hash: TxHash) -> bool { + debug_assert!( + self.eth68_meta.peek(ð68_hash).is_some(), + "broken invariant `eth68-hash` and `eth68-meta`" + ); + + if let Some(size) = self.eth68_meta.peek(ð68_hash) { + let next_acc_size = *acc_size_response + size; + + if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE { + // only update accumulated size of tx response if tx will fit in + *acc_size_response = next_acc_size; + return true + } + } + + false + } + + /// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 68. + /// If necessary, takes hashes from buffer for which peer is listed as fallback peer. Returns + /// left over hashes. + /// + /// 1. Loops through hashes passed as parameter, calculating the accumulated size of the + /// response that this request would generate if filled with requested hashes. + /// 2.a. All hashes fit in response and there is no more space. Returns empty vector. + /// 2.b. Some hashes didn't fit in and there is no more space. Returns surplus hashes. + /// 2.c. All hashes fit in response and there is still space. Surplus hashes = empty vector. + /// 2.d. Some hashes didn't fit in but there is still space. Surplus hashes != empty vector. + /// 3. Try to fill remaining space with hashes from buffer. + /// 4. Return surplus hashes. + pub(super) fn pack_hashes_eth68( + &mut self, + hashes: &mut Vec, + peer_id: PeerId, + ) -> Vec { + let mut acc_size_response = 0; + let mut surplus_hashes = vec![]; + + hashes.retain(|&hash| match self.include_eth68_hash(&mut acc_size_response, hash) { + true => true, + false => { + trace!( + target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=format!("{hash:#}"), + size=self.eth68_meta.get(&hash).expect("should find size in `eth68-meta`"), + acc_size_response=acc_size_response, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "no space for hash in `GetPooledTransactions` request to peer" + ); + + surplus_hashes.push(hash); + false + } + }); + + // all hashes included in request and there is still space + // todo: compare free space with min tx size + if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE { + self.fill_request_for_peer(hashes, peer_id, Some(acc_size_response)); + } + + surplus_hashes + } + + pub(super) fn buffer_hashes_for_retry(&mut self, hashes: impl IntoIterator) { + self.buffer_hashes(hashes, None) + } + + /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be + /// passed as `fallback_peer` parameter! Hashes that have been re-requested + /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. + pub(super) fn buffer_hashes( + &mut self, + hashes: impl IntoIterator, + fallback_peer: Option, + ) { + let mut max_retried_hashes = vec![]; + + for hash in hashes { + // todo: enforce by adding new type UnknownTxHash + debug_assert!( + self.unknown_hashes.peek(&hash).is_some(), + "only hashes that are confirmed as unknown should be buffered" + ); + + let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; + + if let Some(peer_id) = fallback_peer { + // peer has not yet requested hash + peers.insert(peer_id); + } else { + // peer in caller's context has requested hash and is hence not eligible as + // fallback peer. + if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH { + debug!(target: "net::tx", + hash=format!("{hash:#}"), + retries=retries, + "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" + ); + max_retried_hashes.push(hash); + continue; + } + *retries += 1; + } + if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) { + _ = self.unknown_hashes.remove(&evicted_hash); + _ = self.eth68_meta.remove(&evicted_hash); + } + } + + self.remove_from_unknown_hashes(max_retried_hashes); + } + + /// Removes the provided transaction hashes from the inflight requests set. + /// + /// This is called when we receive full transactions that are currently scheduled for fetching. + #[inline] + pub(super) fn on_received_full_transactions_broadcast( + &mut self, + hashes: impl IntoIterator, + ) { + self.remove_from_unknown_hashes(hashes) + } + + pub(super) fn filter_unseen_hashes( + &mut self, + new_announced_hashes: &mut Vec, + peer_id: PeerId, + is_session_active: impl Fn(PeerId) -> bool, + ) { + // filter out inflight hashes, and register the peer as fallback for all inflight hashes + new_announced_hashes.retain(|hash| { + // occupied entry + if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(hash) { + // hash has been seen but is not inflight + if self.buffered_hashes.remove(hash) { + return true + } + // hash has been seen and is in flight. store peer as fallback peer. + // + // remove any ended sessions, so that in case of a full cache, alive peers aren't + // removed in favour of lru dead peers + let mut ended_sessions = vec!(); + for &peer_id in backups.iter() { + if is_session_active(peer_id) { + ended_sessions.push(peer_id); + } + } + for peer_id in ended_sessions { + backups.remove(&peer_id); + } + backups.insert(peer_id); + return false + } + // vacant entry + trace!( + target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=format!("{hash:#}"), + "new hash seen in announcement by peer" + ); + + // todo: allow `MAX_ALTERNATIVE_PEERS_PER_TX` to be zero + let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).expect("MAX_ALTERNATIVE_PEERS_PER_TX should be non-zero"); + + if self.unknown_hashes.get_or_insert(*hash, || + (0, LruCache::new(limit)) + ).is_none() { + + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=format!("{hash:#}"), + "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash" + ); + + return false + } + true + }); + } + + /// Requests the missing transactions from the announced hashes of the peer. Returns the + /// requested hashes if concurrency limit is reached or if the request fails to send over the + /// channel to the peer's session task. + /// + /// This filters all announced hashes that are already in flight, and requests the missing, + /// while marking the given peer as an alternative peer for the hashes that are already in + /// flight. + pub(super) fn request_transactions_from_peer( + &mut self, + new_announced_hashes: Vec, + peer: &Peer, + metrics_increment_egress_peer_channel_full: impl FnOnce(), + ) -> Option> { + let peer_id: PeerId = peer.request_tx.peer_id; + + if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + limit=MAX_CONCURRENT_TX_REQUESTS, + "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer" + ); + return Some(new_announced_hashes) + } + + let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + "failed to cache active peer in schnellru::LruMap, dropping request to peer" + ); + return Some(new_announced_hashes) + }; + + if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")), + limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER, + "limit for concurrent `GetPooledTransactions` requests per peer reached" + ); + return Some(new_announced_hashes) + } + + *inflight_count += 1; + + let (response, rx) = oneshot::channel(); + let req: PeerRequest = PeerRequest::GetPooledTransactions { + request: GetPooledTransactions(new_announced_hashes.clone()), + response, + }; + + // try to send the request to the peer + if let Err(err) = peer.request_tx.try_send(req) { + // peer channel is full + match err { + TrySendError::Full(req) | TrySendError::Closed(req) => { + // need to do some cleanup so + let req = req.into_get_pooled_transactions().expect("is get pooled tx"); + + // we know that the peer is the only entry in the map, so we can remove all + self.remove_from_unknown_hashes(req.0); + } + } + metrics_increment_egress_peer_channel_full(); + return Some(new_announced_hashes) + } else { + // remove requested hashes from buffered hashes + debug_assert!( + || -> bool { + for hash in &new_announced_hashes { + if self.buffered_hashes.contains(hash) { + return false + } + } + true + }(), + "broken invariant `buffered-hashes` and `unknown-hashes`" + ); + + // stores a new request future for the request + self.inflight_requests.push(GetPooledTxRequestFut::new( + peer_id, + new_announced_hashes, + rx, + )) + } + + None + } + + /// Tries to fill request so that the respective tx response is at its size limit. It does so + /// by taking buffered hashes for which peer is listed as fallback peer. If this is an eth68 + /// request, the accumulated size of transactions corresponding to parameter hashes, must also + /// be passed as parameter. + pub(super) fn fill_request_for_peer( + &mut self, + hashes: &mut Vec, + peer_id: PeerId, + mut acc_eth68_size: Option, + ) { + debug_assert!( + acc_eth68_size.is_none() || { + let mut acc_size = 0; + for &hash in hashes.iter() { + _ = self.include_eth68_hash(&mut acc_size, hash); + } + Some(acc_size) == acc_eth68_size + }, + "broken invariant `acc-eth68-size` and `hashes`" + ); + + for hash in self.buffered_hashes.iter() { + // if this request is eth68 txns, check the size metadata + if let Some(acc_size_response) = acc_eth68_size.as_mut() { + if *acc_size_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE { + trace!( + target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=format!("{hash:#}"), + size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"), + acc_size_response=acc_size_response, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "found buffered hash for peer but can't fit it into request" + ); + + break + } + if !self.include_eth68_hash(acc_size_response, *hash) { + trace!( + target: "net::tx", + peer_id=format!("{peer_id:#}"), + hash=format!("{hash:#}"), + size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"), + acc_size_response=acc_size_response, + MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE, + "found buffered hash for peer but can't fit it into request" + ); + + continue + } + // otherwise fill request based on hashes count + } else if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES { + break + } + + debug_assert!( + self.unknown_hashes.peek(hash).is_some(), + "broken invariant `buffered-hashes` and `unknown-hashes`" + ); + + if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) { + // upgrade this peer from fallback peer + if fallback_peers.remove(&peer_id) { + hashes.push(*hash) + } + } + } + + for hash in hashes { + self.buffered_hashes.remove(hash); + } + } +} + +impl Stream for TransactionFetcher { + type Item = FetchEvent; + + /// Advances all inflight requests and returns the next event. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + let res = this.inflight_requests.poll_next_unpin(cx); + + if let Poll::Ready(Some(response)) = res { + // update peer activity, requests for buffered hashes can only be made to idle + // fallback peers + self.update_peer_activity(&response); + + let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response; + + return match result { + Ok(Ok(transactions)) => { + // clear received hashes + requested_hashes.retain(|requested_hash| { + if transactions.hashes().any(|hash| hash == requested_hash) { + // hash is now known, stop tracking + self.unknown_hashes.remove(requested_hash); + self.eth68_meta.remove(requested_hash); + return false + } + true + }); + // buffer left over hashes + self.buffer_hashes_for_retry(requested_hashes); + + Poll::Ready(Some(FetchEvent::TransactionsFetched { + peer_id, + transactions: transactions.0, + })) + } + Ok(Err(req_err)) => { + self.buffer_hashes_for_retry(requested_hashes); + Poll::Ready(Some(FetchEvent::FetchError { peer_id, error: req_err })) + } + Err(_) => { + self.buffer_hashes_for_retry(requested_hashes); + // request channel closed/dropped + Poll::Ready(Some(FetchEvent::FetchError { + peer_id, + error: RequestError::ChannelClosed, + })) + } + } + } + + Poll::Pending + } +} + +impl Default for TransactionFetcher { + fn default() -> Self { + Self { + active_peers: LruMap::new(MAX_CONCURRENT_TX_REQUESTS), + inflight_requests: Default::default(), + buffered_hashes: LruCache::new( + NonZeroUsize::new(MAX_CAPACITY_BUFFERED_HASHES) + .expect("buffered cache limit should be non-zero"), + ), + unknown_hashes: LruMap::new_unlimited(), + eth68_meta: LruMap::new_unlimited(), + } + } +} + +/// Represents possible events from fetching transactions. +#[derive(Debug)] +pub(super) enum FetchEvent { + /// Triggered when transactions are successfully fetched. + TransactionsFetched { + /// The ID of the peer from which transactions were fetched. + peer_id: PeerId, + /// The transactions that were fetched, if available. + transactions: Vec, + }, + /// Triggered when there is an error in fetching transactions. + FetchError { + /// The ID of the peer from which an attempt to fetch transactions resulted in an error. + peer_id: PeerId, + /// The specific error that occurred while fetching. + error: RequestError, + }, +} + +/// An inflight request for `PooledTransactions` from a peer +pub(super) struct GetPooledTxRequest { + peer_id: PeerId, + /// Transaction hashes that were requested, for cleanup purposes + requested_hashes: Vec, + response: oneshot::Receiver>, +} + +pub(super) struct GetPooledTxResponse { + peer_id: PeerId, + /// Transaction hashes that were requested, for cleanup purposes + requested_hashes: Vec, + result: Result, RecvError>, +} + +#[must_use = "futures do nothing unless polled"] +#[pin_project::pin_project] +pub(super) struct GetPooledTxRequestFut { + #[pin] + inner: Option, +} + +impl GetPooledTxRequestFut { + #[inline] + fn new( + peer_id: PeerId, + requested_hashes: Vec, + response: oneshot::Receiver>, + ) -> Self { + Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } + } +} + +impl Future for GetPooledTxRequestFut { + type Output = GetPooledTxResponse; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut req = self.as_mut().project().inner.take().expect("polled after completion"); + match req.response.poll_unpin(cx) { + Poll::Ready(result) => Poll::Ready(GetPooledTxResponse { + peer_id: req.peer_id, + requested_hashes: req.requested_hashes, + result, + }), + Poll::Pending => { + self.project().inner.set(Some(req)); + Poll::Pending + } + } + } +} + +#[cfg(test)] +mod test { + use reth_primitives::B256; + + use crate::transactions::tests::default_cache; + + use super::*; + + #[test] + fn pack_eth68_request_surplus_hashes() { + reth_tracing::init_test_tracing(); + + let tx_fetcher = &mut TransactionFetcher::default(); + + let peer_id = PeerId::new([1; 64]); + + let eth68_hashes = [ + B256::from_slice(&[1; 32]), + B256::from_slice(&[2; 32]), + B256::from_slice(&[3; 32]), + B256::from_slice(&[4; 32]), + B256::from_slice(&[5; 32]), + B256::from_slice(&[6; 32]), + ]; + let eth68_hashes_sizes = [ + MAX_FULL_TRANSACTIONS_PACKET_SIZE - 4, + MAX_FULL_TRANSACTIONS_PACKET_SIZE, // this one will not fit + 2, // this one will fit + 3, // but now this one won't + 2, /* this one will, no more txns will fit + * after this */ + 1, + ]; + + // load unseen hashes + for i in 0..6 { + tx_fetcher.unknown_hashes.insert(eth68_hashes[i], (0, default_cache())); + tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]); + } + + let mut eth68_hashes_to_request = eth68_hashes.clone().to_vec(); + let surplus_eth68_hashes = + tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id); + + assert_eq!(surplus_eth68_hashes, vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5])); + assert_eq!( + eth68_hashes_to_request, + vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4]) + ); + } +} diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions/mod.rs similarity index 74% rename from crates/net/network/src/transactions.rs rename to crates/net/network/src/transactions/mod.rs index b3750b44486a..c549584c2cbf 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1,4 +1,31 @@ //! Transactions management for the p2p network. +//! +//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching +//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is +//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash +//! is already seen in a previous announcement. The hashes that remain from an announcement are +//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes +//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other +//! hand, space remains, hashes that the peer has previously announced are taken out of buffered +//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the +//! peer's session, this marks the peer as active with respect to +//! `fetcher::MAX_CONCURRENT_TX_REQUESTS_PER_PEER`. +//! +//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes` +//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is +//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer, +//! filling it from the buffered hashes. It does so until there are no more idle peers or until +//! the hashes buffer is empty. +//! +//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are +//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request +//! resolves with partial success, that is some of the requested hashes are not in the response, +//! these are then buffered. +//! +//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip +//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long +//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large +//! enough to buffer many hashes during network failure, to allow for recovery. use crate::{ cache::LruCache, @@ -7,7 +34,8 @@ use crate::{ metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, NetworkEvents, NetworkHandle, }; -use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt}; +use futures::{stream::FuturesUnordered, Future, StreamExt}; +use itertools::Itertools; use reth_eth_wire::{ EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, Transactions, @@ -33,31 +61,27 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; +use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::{debug, trace}; +mod fetcher; + +use fetcher::{FetchEvent, TransactionFetcher}; + /// Cache limit of transactions to keep track of for a single peer. const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10; /// Soft limit for NewPooledTransactions const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096; -/// The target size for the message of full transactions. +/// The target size for the message of full transactions in bytes. const MAX_FULL_TRANSACTIONS_PACKET_SIZE: usize = 100 * 1024; -/// Recommended soft limit for the number of hashes in a GetPooledTransactions message (8kb) -/// -/// -const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256; - /// Softlimit for the response size of a GetPooledTransactions message (2MB) const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit = GetPooledTransactionLimit::SizeSoftLimit(2 * 1024 * 1024); -/// How many peers we keep track of for each missing transaction. -const MAX_ALTERNATIVE_PEERS_PER_TX: usize = 3; - /// The future for inserting a function into the pool pub type PoolImportFuture = Pin> + Send + 'static>>; @@ -524,47 +548,202 @@ where return } - let mut num_already_seen = 0; + // get handle to peer's session, if the session is still active + let Some(peer) = self.peers.get_mut(&peer_id) else { + debug!( + peer_id=format!("{peer_id:#}"), + msg=?msg, + "discarding announcement from inactive peer" + ); + return + }; - if let Some(peer) = self.peers.get_mut(&peer_id) { - let mut hashes = msg.into_hashes(); - // keep track of the transactions the peer knows - for tx in hashes.iter().copied() { - if !peer.transactions.insert(tx) { - num_already_seen += 1; - } + // message version decides how hashes are packed + // if this is a eth68 message, store eth68 tx metadata + if let Some(eth68_msg) = msg.as_eth68() { + for (&hash, (_type, size)) in eth68_msg.metadata_iter() { + self.transaction_fetcher.eth68_meta.insert(hash, size); } + } + // extract hashes payload + let mut hashes = msg.into_hashes(); - self.pool.retain_unknown(&mut hashes); - - if hashes.is_empty() { - // nothing to request - return + // keep track of the transactions the peer knows + let mut num_already_seen = 0; + for tx in hashes.iter().copied() { + if !peer.transactions.insert(tx) { + num_already_seen += 1; } + } - // enforce recommended soft limit, however the peer may enforce an arbitrary limit on - // the response (2MB) - hashes.truncate(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES); + // filter out known hashes, those txns have already been successfully fetched + self.pool.retain_unknown(&mut hashes); + if hashes.is_empty() { + // nothing to request + return + } + // filter out already seen unknown hashes, for those hashes add the peer as fallback + let peers = &self.peers; + self.transaction_fetcher + .filter_unseen_hashes(&mut hashes, peer_id, |peer_id| peers.contains_key(&peer_id)); + if hashes.is_empty() { + // nothing to request + return + } - // request the missing transactions - let request_sent = - self.transaction_fetcher.request_transactions_from_peer(hashes, peer); - if !request_sent { - self.metrics.egress_peer_channel_full.increment(1); - return - } + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", hashes.iter().format(", ")), + "received previously unseen hashes in announcement from peer" + ); - if num_already_seen > 0 { - self.metrics.messages_with_already_seen_hashes.increment(1); - trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen hashes"); - } + // only send request for hashes to idle peer, otherwise buffer hashes storing peer as + // fallback + if !self.transaction_fetcher.is_idle(peer_id) { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", hashes.iter().format(", ")), + "buffering hashes announced by busy peer" + ); + + self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id)); + return + } + // demand recommended soft limit on response, however the peer may enforce an arbitrary + // limit on the response (2MB) + let surplus_hashes = self.transaction_fetcher.pack_hashes(&mut hashes, peer_id); + + if !surplus_hashes.is_empty() { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + surplus_hashes=format!("{surplus_hashes:#?}"), + "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" + ); + + self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id)); + } + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", hashes.iter().format(", ")), + "sending hashes in `GetPooledTransactions` request to peer's session" + ); + + // request the missing transactions + // + // get handle to peer's session again, at this point we know it exists + let Some(peer) = self.peers.get_mut(&peer_id) else { return }; + let metrics = &self.metrics; + if let Some(failed_to_request_hashes) = + self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || { + metrics.egress_peer_channel_full.increment(1) + }) + { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")), + "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" + ); + self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); + return } if num_already_seen > 0 { + self.metrics.messages_with_already_seen_hashes.increment(1); + trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen hashes"); self.report_already_seen(peer_id); } } + /// Tries to request hashes in buffer. + /// + /// If any idle fallback peer exists for any hash in the buffer, that hash is taken out of the + /// buffer and put into a request to that peer. Before sending, the request is filled with + /// additional hashes out of the buffer, for which the peer is listed as fallback, as long as + /// space remains in the respective transactions response. + fn request_buffered_hashes(&mut self) { + loop { + let mut hashes = vec![]; + let Some(peer_id) = self.pop_any_idle_peer(&mut hashes) else { return }; + + debug_assert!( + self.peers.contains_key(&peer_id), + "broken invariant `peers` and `transaction-fetcher`" + ); + + // fill the request with other buffered hashes that have been announced by the peer + let Some(peer) = self.peers.get(&peer_id) else { return }; + + let Some(hash) = hashes.first() else { return }; + let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied(); + self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size); + + trace!( + target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", hashes.iter().format(", ")), + "requesting buffered hashes from idle peer" + ); + + // request the buffered missing transactions + let metrics = &self.metrics; + if let Some(failed_to_request_hashes) = + self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || { + metrics.egress_peer_channel_full.increment(1) + }) + { + debug!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")), + "failed sending request to peer's session, buffering hashes" + ); + + self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); + return + } + } + } + + /// Returns any idle peer for any buffered unknown hash, and writes that hash to the request's + /// hashes buffer that is passed as parameter. + /// + /// Loops through the fallback peers of each buffered hashes, until an idle fallback peer is + /// found. As a side effect, dead fallback peers are filtered out for visited hashes. + fn pop_any_idle_peer(&mut self, hashes: &mut Vec) -> Option { + let mut ended_sessions = vec![]; + let mut buffered_hashes_iter = self.transaction_fetcher.buffered_hashes.iter(); + let peers = &self.peers; + + let idle_peer = loop { + let Some(&hash) = buffered_hashes_iter.next() else { break None }; + + let idle_peer = + self.transaction_fetcher.get_idle_peer_for(hash, &mut ended_sessions, |peer_id| { + peers.contains_key(&peer_id) + }); + for peer_id in ended_sessions.drain(..) { + let (_, peers) = self.transaction_fetcher.unknown_hashes.peek_mut(&hash)?; + _ = peers.remove(&peer_id); + } + if idle_peer.is_some() { + hashes.push(hash); + break idle_peer + } + }; + + let peer_id = &idle_peer?; + let hash = hashes.first()?; + + let (_, peers) = self.transaction_fetcher.unknown_hashes.get(hash)?; + // pop peer from fallback peers + _ = peers.remove(peer_id); + // pop hash that is loaded in request buffer from buffered hashes + drop(buffered_hashes_iter); + _ = self.transaction_fetcher.buffered_hashes.remove(hash); + + idle_peer + } + /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { @@ -583,7 +762,7 @@ where // mark the transactions as received self.transaction_fetcher.on_received_full_transactions_broadcast( - non_blob_txs.iter().map(|tx| tx.hash()), + non_blob_txs.iter().map(|tx| *tx.hash()), ); self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast); @@ -829,7 +1008,7 @@ where this.update_request_metrics(); // drain fetching transaction events - while let Poll::Ready(fetch_event) = this.transaction_fetcher.poll(cx) { + while let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) { match fetch_event { FetchEvent::TransactionsFetched { peer_id, transactions } => { this.import_transactions(peer_id, transactions, TransactionSource::Response); @@ -841,6 +1020,9 @@ where } } + // try drain buffered transactions + this.request_buffered_hashes(); + this.update_request_metrics(); this.update_import_metrics(); @@ -992,7 +1174,7 @@ impl PooledTransactionsHashesBuilder { enum TransactionSource { /// Transactions were broadcast to us via [`Transactions`] message. Broadcast, - /// Transactions were sent as the response of [`GetPooledTxRequest`] issued by us. + /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us. Response, } @@ -1005,58 +1187,6 @@ impl TransactionSource { } } -/// An inflight request for `PooledTransactions` from a peer -struct GetPooledTxRequest { - peer_id: PeerId, - /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: Vec, - response: oneshot::Receiver>, -} - -struct GetPooledTxResponse { - peer_id: PeerId, - /// Transaction hashes that were requested, for cleanup purposes - requested_hashes: Vec, - result: Result, RecvError>, -} - -#[must_use = "futures do nothing unless polled"] -#[pin_project::pin_project] -struct GetPooledTxRequestFut { - #[pin] - inner: Option, -} - -impl GetPooledTxRequestFut { - #[inline] - fn new( - peer_id: PeerId, - requested_hashes: Vec, - response: oneshot::Receiver>, - ) -> Self { - Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) } - } -} - -impl Future for GetPooledTxRequestFut { - type Output = GetPooledTxResponse; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut req = self.as_mut().project().inner.take().expect("polled after completion"); - match req.response.poll_unpin(cx) { - Poll::Ready(result) => Poll::Ready(GetPooledTxResponse { - peer_id: req.peer_id, - requested_hashes: req.requested_hashes, - result, - }), - Poll::Pending => { - self.project().inner.set(Some(req)); - Poll::Pending - } - } - } -} - /// Tracks a single peer #[derive(Debug)] struct Peer { @@ -1070,166 +1200,6 @@ struct Peer { client_version: Arc, } -/// The type responsible for fetching missing transactions from peers. -/// -/// This will keep track of unique transaction hashes that are currently being fetched and submits -/// new requests on announced hashes. -#[derive(Debug, Default)] -struct TransactionFetcher { - /// All currently active requests for pooled transactions. - inflight_requests: FuturesUnordered, - /// Set that tracks all hashes that are currently being fetched. - inflight_hash_to_fallback_peers: HashMap>, -} - -// === impl TransactionFetcher === - -impl TransactionFetcher { - /// Removes the specified hashes from inflight tracking. - #[inline] - fn remove_inflight_hashes<'a, I>(&mut self, hashes: I) - where - I: IntoIterator, - { - for &hash in hashes { - self.inflight_hash_to_fallback_peers.remove(&hash); - } - } - - /// Advances all inflight requests and returns the next event. - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { - if let Poll::Ready(Some(GetPooledTxResponse { peer_id, requested_hashes, result })) = - self.inflight_requests.poll_next_unpin(cx) - { - return match result { - Ok(Ok(transactions)) => { - // clear received hashes - self.remove_inflight_hashes(transactions.hashes()); - - // TODO: re-request missing hashes, for now clear all of them - self.remove_inflight_hashes(requested_hashes.iter()); - - Poll::Ready(FetchEvent::TransactionsFetched { - peer_id, - transactions: transactions.0, - }) - } - Ok(Err(req_err)) => { - // TODO: re-request missing hashes - self.remove_inflight_hashes(&requested_hashes); - Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err }) - } - Err(_) => { - // TODO: re-request missing hashes - self.remove_inflight_hashes(&requested_hashes); - // request channel closed/dropped - Poll::Ready(FetchEvent::FetchError { - peer_id, - error: RequestError::ChannelClosed, - }) - } - } - } - Poll::Pending - } - - /// Removes the provided transaction hashes from the inflight requests set. - /// - /// This is called when we receive full transactions that are currently scheduled for fetching. - #[inline] - fn on_received_full_transactions_broadcast<'a>( - &mut self, - hashes: impl IntoIterator, - ) { - self.remove_inflight_hashes(hashes) - } - - /// Requests the missing transactions from the announced hashes of the peer - /// - /// This filters all announced hashes that are already in flight, and requests the missing, - /// while marking the given peer as an alternative peer for the hashes that are already in - /// flight. - fn request_transactions_from_peer( - &mut self, - mut announced_hashes: Vec, - peer: &Peer, - ) -> bool { - let peer_id: PeerId = peer.request_tx.peer_id; - // 1. filter out inflight hashes, and register the peer as fallback for all inflight hashes - announced_hashes.retain(|&hash| { - match self.inflight_hash_to_fallback_peers.entry(hash) { - Entry::Vacant(entry) => { - // the hash is not in inflight hashes, insert it and retain in the vector - entry.insert(vec![peer_id]); - true - } - Entry::Occupied(mut entry) => { - // the hash is already in inflight, add this peer as a backup if not more than 3 - // backups already - let backups = entry.get_mut(); - if backups.len() < MAX_ALTERNATIVE_PEERS_PER_TX { - backups.push(peer_id); - } - false - } - } - }); - - // 2. request all missing from peer - if announced_hashes.is_empty() { - // nothing to request - return false - } - - let (response, rx) = oneshot::channel(); - let req: PeerRequest = PeerRequest::GetPooledTransactions { - request: GetPooledTransactions(announced_hashes.clone()), - response, - }; - - // try to send the request to the peer - if let Err(err) = peer.request_tx.try_send(req) { - // peer channel is full - match err { - TrySendError::Full(req) | TrySendError::Closed(req) => { - // need to do some cleanup so - let req = req.into_get_pooled_transactions().expect("is get pooled tx"); - - // we know that the peer is the only entry in the map, so we can remove all - for hash in req.0.into_iter() { - self.inflight_hash_to_fallback_peers.remove(&hash); - } - } - } - return false - } else { - //create a new request for it, from that peer - self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, announced_hashes, rx)) - } - - true - } -} - -/// Represents possible events from fetching transactions. -#[derive(Debug)] -enum FetchEvent { - /// Triggered when transactions are successfully fetched. - TransactionsFetched { - /// The ID of the peer from which transactions were fetched. - peer_id: PeerId, - /// The transactions that were fetched, if available. - transactions: Vec, - }, - /// Triggered when there is an error in fetching transactions. - FetchError { - /// The ID of the peer from which an attempt to fetch transactions resulted in an error. - peer_id: PeerId, - /// The specific error that occurred while fetching. - error: RequestError, - }, -} - /// Commands to send to the [`TransactionsManager`] #[derive(Debug)] enum TransactionsCommand { @@ -1288,6 +1258,7 @@ mod tests { use super::*; use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager}; use alloy_rlp::Decodable; + use futures::FutureExt; use reth_interfaces::sync::{NetworkSyncUpdater, SyncState}; use reth_network_api::NetworkInfo; use reth_primitives::hex; @@ -1295,7 +1266,51 @@ mod tests { use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; use secp256k1::SecretKey; - use std::future::poll_fn; + use std::{future::poll_fn, hash}; + + use fetcher::MAX_ALTERNATIVE_PEERS_PER_TX; + + async fn new_tx_manager() -> TransactionsManager { + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let client = NoopProvider::default(); + + let config = NetworkConfigBuilder::new(secret_key).disable_discovery().build(client); + + let pool = testing_pool(); + + let (_network_handle, _network, transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + + transactions + } + + pub(super) fn default_cache() -> LruCache { + let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).unwrap(); + LruCache::new(limit) + } + + // Returns (peer, channel-to-send-get-pooled-tx-response-on). + fn new_mock_session( + peer_id: PeerId, + version: EthVersion, + ) -> (Peer, mpsc::Receiver) { + let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1); + + ( + Peer { + transactions: default_cache(), + request_tx: PeerRequestSender::new(peer_id, to_mock_session_tx), + version, + client_version: Arc::from(""), + }, + to_mock_session_rx, + ) + } + #[tokio::test(flavor = "multi_thread")] async fn test_ignored_tx_broadcasts_while_initially_syncing() { reth_tracing::init_test_tracing(); @@ -1643,4 +1658,163 @@ mod tests { } } } + + #[tokio::test] + async fn max_retries_tx_request() { + reth_tracing::init_test_tracing(); + + let mut tx_manager = new_tx_manager().await; + let tx_fetcher = &mut tx_manager.transaction_fetcher; + + let peer_id_1 = PeerId::new([1; 64]); + let peer_id_2 = PeerId::new([2; 64]); + let eth_version = EthVersion::Eth66; + let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])]; + + let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version); + tx_manager.peers.insert(peer_id_1, peer_1); + + // hashes are seen and currently not inflight, with one fallback peer, and are buffered + // for first retry. + let retries = 1; + let mut backups = default_cache(); + backups.insert(peer_id_1); + tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups.clone())); + tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups)); + tx_fetcher.buffered_hashes.insert(seen_hashes[0]); + tx_fetcher.buffered_hashes.insert(seen_hashes[1]); + + // peer_1 is idle + assert!(tx_fetcher.is_idle(peer_id_1)); + + // sends request for buffered hashes to peer_1 + tx_manager.request_buffered_hashes(); + + let tx_fetcher = &mut tx_manager.transaction_fetcher; + + assert!(tx_fetcher.buffered_hashes.is_empty()); + // as long as request is in inflight peer_1 is not idle + assert!(!tx_fetcher.is_idle(peer_id_1)); + + // mock session of peer_1 receives request + let req = to_mock_session_rx + .recv() + .await + .expect("peer_1 session should receive request with buffered hashes"); + let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() }; + let GetPooledTransactions(hashes) = request; + + assert_eq!(hashes, seen_hashes); + + // fail request to peer_1 + response + .send(Err(RequestError::BadResponse)) + .expect("should send peer_1 response to tx manager"); + let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else { + unreachable!() + }; + + // request has resolved, peer_1 is idle again + assert!(tx_fetcher.is_idle(peer_id)); + // failing peer_1's request buffers requested hashes for retry + assert_eq!(tx_fetcher.buffered_hashes.len(), 2); + + let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version); + tx_manager.peers.insert(peer_id_2, peer_2); + + // peer_2 announces same hashes as peer_1 + let msg = + NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec())); + tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg); + + let tx_fetcher = &mut tx_manager.transaction_fetcher; + + // since hashes are already seen, no changes to length of unknown hashes + assert_eq!(tx_fetcher.unknown_hashes.len(), 2); + // but hashes are taken out of buffer and packed into request to peer_2 + assert!(tx_fetcher.buffered_hashes.is_empty()); + + // mock session of peer_2 receives request + let req = to_mock_session_rx + .recv() + .await + .expect("peer_2 session should receive request with buffered hashes"); + let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() }; + + // report failed request to tx manager + response + .send(Err(RequestError::BadResponse)) + .expect("should send peer_2 response to tx manager"); + let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() }; + + // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached however this time won't be + // buffered for retry + assert!(tx_fetcher.buffered_hashes.is_empty()); + } + + #[tokio::test] + async fn fill_eth68_request_for_peer() { + reth_tracing::init_test_tracing(); + + let mut tx_manager = new_tx_manager().await; + let tx_fetcher = &mut tx_manager.transaction_fetcher; + + let peer_id = PeerId::new([1; 64]); + let eth_version = EthVersion::Eth68; + let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])]; + let unseen_eth68_hashes_sizes = + [MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4]; + let seen_eth68_hashes = + [B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32])]; + let seen_eth68_hashes_sizes = [ + 5, + 3, // the second hash should be filled into the request because there is space for it + 4, // then there is no space for the last anymore + ]; + + // insert peer in tx manager + let (peer, _to_mock_session_rx) = new_mock_session(peer_id, eth_version); + tx_manager.peers.insert(peer_id, peer); + + // hashes are seen and currently not inflight, with one fallback peer, and are buffered + // for first try to fetch. + let mut backups = default_cache(); + backups.insert(peer_id); + for i in 0..3 { + tx_fetcher.unknown_hashes.insert(seen_eth68_hashes[i], (0, backups.clone())); + tx_fetcher.eth68_meta.insert(seen_eth68_hashes[i], seen_eth68_hashes_sizes[i]); + tx_fetcher.buffered_hashes.insert(seen_eth68_hashes[i]); + } + + let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version); + tx_manager.peers.insert(peer_id, peer); + + // peer announces previously unseen hashes + let msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 { + hashes: unseen_eth68_hashes.to_vec(), + sizes: unseen_eth68_hashes_sizes.to_vec(), + types: [0; 2].to_vec(), + }); + tx_manager.on_new_pooled_transaction_hashes(peer_id, msg); + + let tx_fetcher = &mut tx_manager.transaction_fetcher; + + // since hashes are unseen, length of unknown hashes increases + assert_eq!(tx_fetcher.unknown_hashes.len(), 5); + // seen_eth68_hashes[1] should be taken out of buffer and packed into request + assert_eq!(tx_fetcher.buffered_hashes.len(), 2); + assert!(tx_fetcher.buffered_hashes.contains(&seen_eth68_hashes[0])); + + // mock session of peer receives request + let req = to_mock_session_rx + .recv() + .await + .expect("peer session should receive request with buffered hashes"); + let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() }; + let GetPooledTransactions(hashes) = request; + + let mut expected_request = unseen_eth68_hashes.to_vec(); + expected_request.push(seen_eth68_hashes[1]); + assert_eq!(hashes, expected_request); + } }