Skip to content

Commit

Permalink
Buffer hashes instead of throw away upon failed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Feb 1, 2024
1 parent db6d3d7 commit e82b284
Showing 1 changed file with 22 additions and 36 deletions.
58 changes: 22 additions & 36 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,32 +240,20 @@ impl TransactionFetcher {
surplus_hashes
}

pub(super) fn buffer_hashes_for_retry(&mut self, mut hashes: Vec<TxHash>) {
// It could be that the txns have been received over broadcast in the time being.
hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some());
pub(super) fn buffer_hashes_for_retry(&mut self, hashes: Vec<TxHash>) {
self.buffer_hashes(hashes, None)
}

/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
/// passed as `fallback_peer` parameter! Hashes that have been re-requested
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped.
pub(super) fn buffer_hashes(
&mut self,
hashes: impl IntoIterator<Item = TxHash>,
fallback_peer: Option<PeerId>,
) {
pub(super) fn buffer_hashes(&mut self, mut hashes: Vec<TxHash>, fallback_peer: Option<PeerId>) {
let mut max_retried_and_evicted_hashes = vec![];

for hash in hashes {
// todo: enforce by adding new types UnknownTxHash66 and UnknownTxHash68
debug_assert!(
self.unknown_hashes.peek(&hash).is_some(),
"`%hash` in `@buffered_hashes` that's not in `@unknown_hashes`, `@buffered_hashes` should be a subset of keys in `@unknown_hashes`, broken invariant `@buffered_hashes` and `@unknown_hashes`,
`%hash`: {},
`@self`: {:?}",
hash, self
);
// It could be that the txns have been received over broadcast in the time being.
hashes.retain(|hash| self.unknown_hashes.peek(hash).is_some());

for hash in hashes {
let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return };

if let Some(peer_id) = fallback_peer {
Expand Down Expand Up @@ -434,6 +422,21 @@ impl TransactionFetcher {

*inflight_count += 1;

debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
if self.buffered_hashes.contains(hash) {
return false
}
}
true
}(),
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
`%new_announced_hashes`: {:?},
`@self`: {:?}",
new_announced_hashes, self
);

let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(new_announced_hashes.clone()),
Expand All @@ -448,28 +451,11 @@ impl TransactionFetcher {
// need to do some cleanup so
let req = req.into_get_pooled_transactions().expect("is get pooled tx");

// we know that the peer is the only entry in the map, so we can remove all
self.remove_from_unknown_hashes(req.0);
metrics_increment_egress_peer_channel_full();
return Some(req.0)
}
}
metrics_increment_egress_peer_channel_full();
return Some(new_announced_hashes)
} else {
debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
if self.buffered_hashes.contains(hash) {
return false
}
}
true
}(),
"`%new_announced_hashes` should been taken out of buffer before packing in a request, breaks invariant `@buffered_hashes` and `@inflight_requests`,
`%new_announced_hashes`: {:?},
`@self`: {:?}",
new_announced_hashes, self
);

// stores a new request future for the request
self.inflight_requests.push(GetPooledTxRequestFut::new(
peer_id,
Expand Down

0 comments on commit e82b284

Please sign in to comment.