Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make limit on tx packet size soft in tx fetcher #6201

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};

use super::{Peer, PooledTransactions, MAX_FULL_TRANSACTIONS_PACKET_SIZE};
use super::{Peer, PooledTransactions, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT};

/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1;
Expand Down Expand Up @@ -175,8 +175,9 @@ impl TransactionFetcher {
if let Some(size) = self.eth68_meta.peek(&hash) {
let next_acc_size = *acc_size_response + size;

if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
// only update accumulated size of tx response if tx will fit in
if next_acc_size <= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
// only update accumulated size of tx response if tx will fit in without exceeding
// soft limit
*acc_size_response = next_acc_size;
return true
}
Expand All @@ -202,6 +203,14 @@ impl TransactionFetcher {
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
) -> Vec<TxHash> {
if let Some(hash) = hashes.first() {
if let Some(size) = self.eth68_meta.get(hash) {
if *size >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
return hashes.drain(1..).collect::<Vec<_>>()
}
}
}

let mut acc_size_response = 0;
let mut surplus_hashes = vec![];

Expand All @@ -213,7 +222,7 @@ impl TransactionFetcher {
hash=%hash,
size=self.eth68_meta.peek(&hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
"no space for hash in `GetPooledTransactions` request to peer"
);

Expand All @@ -224,7 +233,7 @@ impl TransactionFetcher {

// all hashes included in request and there is still space
// todo: compare free space with min tx size
if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE {
if acc_size_response < FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
self.fill_eth68_request_for_peer(hashes, peer_id, &mut acc_size_response);
}

Expand Down Expand Up @@ -489,6 +498,10 @@ impl TransactionFetcher {
peer_id: PeerId,
acc_size_response: &mut usize,
) {
if *acc_size_response >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
return
}

debug_assert!(
{
let mut acc_size = 0;
Expand All @@ -509,11 +522,11 @@ impl TransactionFetcher {
let mut next_acc_size = *acc_size_response;

// 1. Check acc size against limit, if so stop looping.
if next_acc_size >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
if next_acc_size >= FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
acc_size_eth68_response=acc_size_response, // no change acc size
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
"request to peer full"
);

Expand Down Expand Up @@ -555,7 +568,7 @@ impl TransactionFetcher {
peer_id=format!("{peer_id:#}"),
hash=%hash,
acc_size_eth68_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
"found buffered hash for request to peer"
);
}
Expand Down Expand Up @@ -616,7 +629,7 @@ impl TransactionFetcher {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%hash,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT=FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT,
"found buffered hash for request to peer"
);
}
Expand Down Expand Up @@ -805,12 +818,12 @@ mod test {
B256::from_slice(&[6; 32]),
];
let eth68_hashes_sizes = [
MAX_FULL_TRANSACTIONS_PACKET_SIZE - 4,
MAX_FULL_TRANSACTIONS_PACKET_SIZE, // this one will not fit
2, // this one will fit
3, // but now this one won't
2, /* this one will, no more txns will fit
* after this */
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT - 4,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT, // this one will not fit
2, // this one will fit
3, // but now this one won't
2, /* this one will, no more txns will fit
* after this */
1,
];

Expand Down
18 changes: 10 additions & 8 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
/// Soft limit for NewPooledTransactions
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;

/// The target size for the message of full transactions in bytes.
const MAX_FULL_TRANSACTIONS_PACKET_SIZE: usize = 100 * 1024;
/// Soft limit for the message of full transactions in bytes.
const FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT: usize = 100 * 1024;

/// Softlimit for the response size of a GetPooledTransactions message (2MB)
const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
Expand Down Expand Up @@ -1112,7 +1112,7 @@ impl PropagateTransaction {
}

/// Helper type for constructing the full transaction message that enforces the
/// `MAX_FULL_TRANSACTIONS_PACKET_SIZE`
/// `FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT`
#[derive(Default)]
struct FullTransactionsBuilder {
total_size: usize,
Expand All @@ -1122,10 +1122,10 @@ struct FullTransactionsBuilder {
// === impl FullTransactionsBuilder ===

impl FullTransactionsBuilder {
/// Append a transaction to the list if it doesn't exceed the maximum size.
/// Append a transaction to the list if it doesn't exceed the maximum target size.
fn push(&mut self, transaction: &PropagateTransaction) {
let new_size = self.total_size + transaction.size;
if new_size > MAX_FULL_TRANSACTIONS_PACKET_SIZE {
if new_size > FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT {
return
}

Expand Down Expand Up @@ -1789,8 +1789,10 @@ mod tests {
let peer_id = PeerId::new([1; 64]);
let eth_version = EthVersion::Eth68;
let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
let unseen_eth68_hashes_sizes =
[MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4];
let unseen_eth68_hashes_sizes = [
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT / 2,
FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT / 2 - 4,
];
// hashes and sizes to buffer in reverse order so that seen_eth68_hashes[0] and
// seen_eth68_hashes_sizes[0] are lru
let seen_eth68_hashes =
Expand Down Expand Up @@ -1826,7 +1828,7 @@ mod tests {
let mut backups = default_cache();
backups.insert(peer_id_other);
tx_fetcher.unknown_hashes.insert(hash_other, (0, backups));
tx_fetcher.eth68_meta.insert(hash_other, MAX_FULL_TRANSACTIONS_PACKET_SIZE - 2); // a big tx
tx_fetcher.eth68_meta.insert(hash_other, FULL_TRANSACTIONS_PACKET_SIZE_SOFT_LIMIT - 2); // a big tx
tx_fetcher.buffered_hashes.insert(hash_other);

let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version);
Expand Down
Loading