From b15a3d17365846d81cca3a1d422b787795f4f0fd Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 31 Jan 2024 23:51:13 +0100 Subject: [PATCH 1/5] Fix bug, rebuffer hashes that were received over broadcast --- .../net/network/src/transactions/fetcher.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 122868c8900d..9f729eb742c4 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -83,6 +83,7 @@ impl TransactionFetcher { for hash in hashes { self.unknown_hashes.remove(&hash); self.eth68_meta.remove(&hash); + self.buffered_hashes.remove(&hash); } } @@ -239,7 +240,9 @@ impl TransactionFetcher { surplus_hashes } - pub(super) fn buffer_hashes_for_retry(&mut self, hashes: impl IntoIterator) { + pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec) { + // Igt could be that the txns have been received over broadcast in the time being. + hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some()); self.buffer_hashes(hashes, None) } @@ -251,7 +254,7 @@ impl TransactionFetcher { hashes: impl IntoIterator, fallback_peer: Option, ) { - let mut max_retried_hashes = vec![]; + let mut max_retried_and_evicted_hashes = vec![]; for hash in hashes { // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 @@ -286,18 +289,17 @@ impl TransactionFetcher { "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash" ); - max_retried_hashes.push(hash); + max_retried_and_evicted_hashes.push(hash); continue; } *retries += 1; } if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) { - _ = self.unknown_hashes.remove(&evicted_hash); - _ = self.eth68_meta.remove(&evicted_hash); + max_retried_and_evicted_hashes.push(evicted_hash); } } - self.remove_from_unknown_hashes(max_retried_hashes); + self.remove_from_unknown_hashes(max_retried_and_evicted_hashes); } /// Removes the provided transaction hashes from the inflight requests set. @@ -682,15 +684,16 @@ impl Stream for TransactionFetcher { return match result { Ok(Ok(transactions)) => { // clear received hashes + let mut fetched = Vec::with_capacity(transactions.hashes().count()); requested_hashes.retain(|requested_hash| { if transactions.hashes().any(|hash| hash == requested_hash) { // hash is now known, stop tracking - self.unknown_hashes.remove(requested_hash); - self.eth68_meta.remove(requested_hash); + fetched.push(*requested_hash); return false } true }); + self.remove_from_unknown_hashes(fetched); // buffer left over hashes self.buffer_hashes_for_retry(requested_hashes); From db6d3d7e5ff3dfe73e262e98ab4786e95283c12b Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 1 Feb 2024 00:47:27 +0100 Subject: [PATCH 2/5] Fix typo --- crates/net/network/src/transactions/fetcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 9f729eb742c4..a9243643bd55 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -241,7 +241,7 @@ impl TransactionFetcher { } pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec) { - // Igt could be that the txns have been received over broadcast in the time being. + // It could be that the txns have been received over broadcast in the time being. hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some()); self.buffer_hashes(hashes, None) } From e82b2847d059cb115419e3a1950101f220200d5e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 1 Feb 2024 01:00:29 +0100 Subject: [PATCH 3/5] Buffer hashes instead of throw away upon failed channel --- .../net/network/src/transactions/fetcher.rs | 58 +++++++------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index a9243643bd55..bb57b5744dea 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -240,32 +240,20 @@ impl TransactionFetcher { surplus_hashes } - pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec) { - // It could be that the txns have been received over broadcast in the time being. - hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some()); + pub(super) fn buffer_hashes_for_retry(&mut self, hashes: Vec) { self.buffer_hashes(hashes, None) } /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be /// passed as `fallback_peer` parameter! Hashes that have been re-requested /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. - pub(super) fn buffer_hashes( - &mut self, - hashes: impl IntoIterator, - fallback_peer: Option, - ) { + pub(super) fn buffer_hashes(&mut self, mut hashes: Vec, fallback_peer: Option) { let mut max_retried_and_evicted_hashes = vec![]; - for hash in hashes { - // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 - debug_assert!( - self.unknown_hashes.peek(&hash).is_some(), - "`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, -`%hash`: {}, -`@self`: {:?}", - hash, self - ); + // It could be that the txns have been received over broadcast in the time being. + hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some()); + for hash in hashes { let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; if let Some(peer_id) = fallback_peer { @@ -434,6 +422,21 @@ impl TransactionFetcher { *inflight_count += 1; + debug_assert!( + || -> bool { + for hash in &new_announced_hashes { + if self.buffered_hashes.contains(hash) { + return false + } + } + true + }(), + "`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`, +`%new_announced_hashes`: {:?}, +`@self`: {:?}", + new_announced_hashes, self + ); + let (response, rx) = oneshot::channel(); let req: PeerRequest = PeerRequest::GetPooledTransactions { request: GetPooledTransactions(new_announced_hashes.clone()), @@ -448,28 +451,11 @@ impl TransactionFetcher { // need to do some cleanup so let req = req.into_get_pooled_transactions().expect("is get pooled tx"); - // we know that the peer is the only entry in the map, so we can remove all - self.remove_from_unknown_hashes(req.0); + metrics_increment_egress_peer_channel_full(); + return Some(req.0) } } - metrics_increment_egress_peer_channel_full(); - return Some(new_announced_hashes) } else { - debug_assert!( - || -> bool { - for hash in &new_announced_hashes { - if self.buffered_hashes.contains(hash) { - return false - } - } - true - }(), - "`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`, -`%new_announced_hashes`: {:?}, -`@self`: {:?}", - new_announced_hashes, self - ); - // stores a new request future for the request self.inflight_requests.push(GetPooledTxRequestFut::new( peer_id, From f4170559222ce36b62172b0316d82030f218913e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 1 Feb 2024 15:53:48 +0100 Subject: [PATCH 4/5] Only filter out known hashes in async code, and promote yet unknown hashes to front --- crates/net/network/src/transactions/fetcher.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index bb57b5744dea..638f2e5c620b 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -240,19 +240,19 @@ impl TransactionFetcher { surplus_hashes } - pub(super) fn buffer_hashes_for_retry(&mut self, hashes: Vec) { + pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec) { + // It could be that the txns have been received over broadcast in the time being. + hashes.retain(|hash| self.unknown_hashes.get(hash).is_some()); + self.buffer_hashes(hashes, None) } /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be /// passed as `fallback_peer` parameter! Hashes that have been re-requested /// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped. - pub(super) fn buffer_hashes(&mut self, mut hashes: Vec, fallback_peer: Option) { + pub(super) fn buffer_hashes(&mut self, hashes: Vec, fallback_peer: Option) { let mut max_retried_and_evicted_hashes = vec![]; - // It could be that the txns have been received over broadcast in the time being. - hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some()); - for hash in hashes { let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; From 3b44dbfaa808840d19c2df8daef81d49160b1efd Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Thu, 1 Feb 2024 16:07:05 +0100 Subject: [PATCH 5/5] Re-insert debug-assert --- crates/net/network/src/transactions/fetcher.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 638f2e5c620b..423bcc49c6c2 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -254,6 +254,14 @@ impl TransactionFetcher { let mut max_retried_and_evicted_hashes = vec![]; for hash in hashes { + // todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68 + debug_assert!( + self.unknown_hashes.peek(&hash).is_some(), + "`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`, +`%hash`: {hash}, +`@self`: {self:?}", + ); + let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return }; if let Some(peer_id) = fallback_peer {