diff --git a/crates/net/network/src/cache.rs b/crates/net/network/src/cache.rs index 80a8972b8b39..4acd66e14b96 100644 --- a/crates/net/network/src/cache.rs +++ b/crates/net/network/src/cache.rs @@ -52,6 +52,11 @@ impl LruCache { { self.inner.contains(value) } + + /// Returns an iterator over all cached entries + pub fn iter(&self) -> impl Iterator + '_ { + self.inner.iter() + } } impl Extend for LruCache diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index dd25a37e84a9..0db1d2318c7d 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -27,7 +27,7 @@ use reth_transaction_pool::{ PropagatedTransactions, TransactionPool, ValidPoolTransaction, }; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, num::NonZeroUsize, pin::Pin, sync::Arc, @@ -91,8 +91,10 @@ impl TransactionsHandle { } /// Request the active peer IDs from the [`TransactionsManager`]. - pub fn get_active_peers(&self) { - self.send(TransactionsCommand::GetActivePeers) + pub async fn get_active_peers(&self) -> Result, RecvError> { + let (tx, rx) = oneshot::channel(); + self.send(TransactionsCommand::GetActivePeers(tx)); + rx.await } /// Manually propagate full transactions to a specific peer. @@ -101,13 +103,22 @@ impl TransactionsHandle { } /// Request the transaction hashes known by specific peers. - pub fn get_transaction_hashes(&self, peers: Vec) { - self.send(TransactionsCommand::GetTransactionHashes(peers)) + pub async fn get_transaction_hashes( + &self, + peers: Vec, + ) -> Result>, RecvError> { + let (tx, rx) = oneshot::channel(); + self.send(TransactionsCommand::GetTransactionHashes { peers, tx }); + rx.await } /// Request the transaction hashes known by a specific peer. - pub fn get_peer_transaction_hashes(&self, peer: PeerId) { - self.send(TransactionsCommand::GetPeerTransactionHashes(peer)) + pub async fn get_peer_transaction_hashes( + &self, + peer: PeerId, + ) -> Result, RecvError> { + let res = self.get_transaction_hashes(vec![peer]).await?; + Ok(res.into_values().next().unwrap_or_default()) } } @@ -346,6 +357,53 @@ where propagated } + /// Propagate the full transactions to a specific peer + /// + /// Returns the propagated transactions + fn propagate_full_transactions_to_peer( + &mut self, + txs: Vec, + peer_id: PeerId, + ) -> Option { + let peer = self.peers.get_mut(&peer_id)?; + let mut propagated = PropagatedTransactions::default(); + trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer"); + + // filter all transactions unknown to the peer + let mut full_transactions = FullTransactionsBuilder::default(); + + let to_propagate = self + .pool + .get_all(txs) + .into_iter() + .filter(|tx| !tx.transaction.is_eip4844()) + .map(PropagateTransaction::new); + + // Iterate through the transactions to propagate and fill the hashes and full transaction + for tx in to_propagate { + if peer.transactions.insert(tx.hash()) { + full_transactions.push(&tx); + } + } + + if full_transactions.transactions.is_empty() { + // nothing to propagate + return None + } + + let new_full_transactions = full_transactions.build(); + for tx in new_full_transactions.iter() { + propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id)); + } + // send full transactions + self.network.send_transactions(peer_id, new_full_transactions); + + // Update propagated transactions metrics + self.metrics.propagated_transactions.increment(propagated.0.len() as u64); + + Some(propagated) + } + /// Propagate the transaction hashes to the given peer /// /// Note: This will only send the hashes for transactions that exist in the pool. @@ -493,10 +551,27 @@ where TransactionsCommand::PropagateHashesTo(hashes, peer) => { self.propagate_hashes_to(hashes, peer) } - TransactionsCommand::GetActivePeers => todo!(), - TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => todo!(), - TransactionsCommand::GetTransactionHashes(_peers) => todo!(), - TransactionsCommand::GetPeerTransactionHashes(_peer) => todo!(), + TransactionsCommand::GetActivePeers(tx) => { + let peers = self.peers.keys().copied().collect::>(); + tx.send(peers).ok(); + } + TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => { + if let Some(propagated) = self.propagate_full_transactions_to_peer(_txs, _peer) { + self.pool.on_propagated(propagated); + } + } + TransactionsCommand::GetTransactionHashes { peers, tx } => { + let mut res = HashMap::with_capacity(peers.len()); + for peer_id in peers { + let hashes = self + .peers + .get(&peer_id) + .map(|peer| peer.transactions.iter().copied().collect::>()) + .unwrap_or_default(); + res.insert(peer_id, hashes); + } + tx.send(res).ok(); + } } } @@ -929,13 +1004,14 @@ enum TransactionsCommand { /// Propagate transaction hashes to a specific peer. PropagateHashesTo(Vec, PeerId), /// Request the list of active peer IDs from the [`TransactionsManager`]. - GetActivePeers, + GetActivePeers(oneshot::Sender>), /// Propagate a collection of full transactions to a specific peer. PropagateTransactionsTo(Vec, PeerId), /// Request transaction hashes known by specific peers from the [`TransactionsManager`]. - GetTransactionHashes(Vec), - /// Request transaction hashes known by a specific peer from the [`TransactionsManager`]. - GetPeerTransactionHashes(PeerId), + GetTransactionHashes { + peers: Vec, + tx: oneshot::Sender>>, + }, } /// All events related to transactions emitted by the network.