Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug, rebuffer hashes that were received over broadcast #6316

Merged
merged 5 commits into from
Feb 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 32 additions & 35 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl TransactionFetcher {
for hash in hashes {
self.unknown_hashes.remove(&hash);
self.eth68_meta.remove(&hash);
self.buffered_hashes.remove(&hash);
}
}

Expand Down Expand Up @@ -239,28 +240,26 @@ impl TransactionFetcher {
surplus_hashes
}

pub(super) fn buffer_hashes_for_retry(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec<TxHash>) {
// It could be that the txns have been received over broadcast in the time being.
hashes.retain(|hash| self.unknown_hashes.get(hash).is_some());

self.buffer_hashes(hashes, None)
}

/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
/// passed as `fallback_peer` parameter! Hashes that have been re-requested
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped.
pub(super) fn buffer_hashes(
&mut self,
hashes: impl IntoIterator<Item = TxHash>,
fallback_peer: Option<PeerId>,
) {
let mut max_retried_hashes = vec![];
pub(super) fn buffer_hashes(&mut self, hashes: Vec<TxHash>, fallback_peer: Option<PeerId>) {
let mut max_retried_and_evicted_hashes = vec![];

for hash in hashes {
// todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68
debug_assert!(
self.unknown_hashes.peek(&hash).is_some(),
"`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
`%hash`: {},
`@self`: {:?}",
hash, self
`%hash`: {hash},
`@self`: {self:?}",
);

let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return };
Expand All @@ -286,18 +285,17 @@ impl TransactionFetcher {
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
);

max_retried_hashes.push(hash);
max_retried_and_evicted_hashes.push(hash);
continue;
}
*retries += 1;
}
if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) {
_ = self.unknown_hashes.remove(&evicted_hash);
_ = self.eth68_meta.remove(&evicted_hash);
max_retried_and_evicted_hashes.push(evicted_hash);
}
}

self.remove_from_unknown_hashes(max_retried_hashes);
self.remove_from_unknown_hashes(max_retried_and_evicted_hashes);
}

/// Removes the provided transaction hashes from the inflight requests set.
Expand Down Expand Up @@ -432,6 +430,21 @@ impl TransactionFetcher {

*inflight_count += 1;

debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
if self.buffered_hashes.contains(hash) {
return false
}
}
true
}(),
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
`%new_announced_hashes`: {:?},
`@self`: {:?}",
new_announced_hashes, self
);

let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(new_announced_hashes.clone()),
Expand All @@ -446,28 +459,11 @@ impl TransactionFetcher {
// need to do some cleanup so
let req = req.into_get_pooled_transactions().expect("is get pooled tx");

// we know that the peer is the only entry in the map, so we can remove all
self.remove_from_unknown_hashes(req.0);
metrics_increment_egress_peer_channel_full();
return Some(req.0)
}
}
metrics_increment_egress_peer_channel_full();
return Some(new_announced_hashes)
} else {
debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
if self.buffered_hashes.contains(hash) {
return false
}
}
true
}(),
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
`%new_announced_hashes`: {:?},
`@self`: {:?}",
new_announced_hashes, self
);

// stores a new request future for the request
self.inflight_requests.push(GetPooledTxRequestFut::new(
peer_id,
Expand Down Expand Up @@ -682,15 +678,16 @@ impl Stream for TransactionFetcher {
return match result {
Ok(Ok(transactions)) => {
// clear received hashes
let mut fetched = Vec::with_capacity(transactions.hashes().count());
requested_hashes.retain(|requested_hash| {
if transactions.hashes().any(|hash| hash == requested_hash) {
// hash is now known, stop tracking
self.unknown_hashes.remove(requested_hash);
self.eth68_meta.remove(requested_hash);
fetched.push(*requested_hash);
return false
}
true
});
self.remove_from_unknown_hashes(fetched);
// buffer left over hashes
self.buffer_hashes_for_retry(requested_hashes);

Expand Down
Loading