Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make TransactionsManager Future impl generic over NetworkPrimitives #13115

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions crates/net/eth-wire-types/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Abstraction over primitive types in network messages.

use alloy_rlp::{Decodable, Encodable};
use reth_primitives_traits::{Block, BlockHeader};
use reth_primitives_traits::{Block, BlockHeader, SignedTransaction};
use std::fmt::Debug;

/// Abstraction over primitive types which might appear in network messages. See
Expand Down Expand Up @@ -62,17 +62,7 @@ pub trait NetworkPrimitives:
+ 'static;

/// The transaction type which peers return in `PooledTransactions` messages.
type PooledTransaction: TryFrom<Self::BroadcastedTransaction>
+ Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
type PooledTransaction: SignedTransaction + TryFrom<Self::BroadcastedTransaction> + 'static;

/// The transaction type which peers return in `GetReceipts` messages.
type Receipt: Encodable
Expand Down
39 changes: 21 additions & 18 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_primitives::PooledTransactionsElement;
use reth_primitives_traits::SignedTransaction;
use schnellru::ByLength;
#[cfg(debug_assertions)]
use smallvec::{smallvec, SmallVec};
Expand Down Expand Up @@ -895,16 +896,14 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
approx_capacity_get_pooled_transactions_req_eth66()
}
}
}

impl TransactionFetcher {
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
/// [`FetchEvent`], which will then be streamed by
/// [`TransactionsManager`](super::TransactionsManager).
pub fn on_resolved_get_pooled_transactions_request_fut(
&mut self,
response: GetPooledTxResponse,
) -> FetchEvent {
response: GetPooledTxResponse<N::PooledTransaction>,
) -> FetchEvent<N::PooledTransaction> {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
Expand Down Expand Up @@ -1026,8 +1025,8 @@ impl TransactionFetcher {
}
}

impl Stream for TransactionFetcher {
type Item = FetchEvent;
impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
type Item = FetchEvent<N::PooledTransaction>;

/// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -1176,18 +1175,18 @@ impl<T> Future for GetPooledTxRequestFut<T> {

/// Wrapper of unverified [`PooledTransactions`].
#[derive(Debug, Constructor, Deref)]
pub struct UnverifiedPooledTransactions {
txns: PooledTransactions,
pub struct UnverifiedPooledTransactions<T> {
txns: PooledTransactions<T>,
}

/// [`PooledTransactions`] that have been successfully verified.
#[derive(Debug, Constructor, Deref)]
pub struct VerifiedPooledTransactions {
txns: PooledTransactions,
pub struct VerifiedPooledTransactions<T> {
txns: PooledTransactions<T>,
}

impl DedupPayload for VerifiedPooledTransactions {
type Value = PooledTransactionsElement;
impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
type Value = T;

fn is_empty(&self) -> bool {
self.txns.is_empty()
Expand All @@ -1199,26 +1198,30 @@ impl DedupPayload for VerifiedPooledTransactions {

fn dedup(self) -> PartiallyValidData<Self::Value> {
PartiallyValidData::from_raw_data(
self.txns.into_iter().map(|tx| (*tx.hash(), tx)).collect(),
self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
None,
)
}
}

trait VerifyPooledTransactionsResponse {
type Transaction: SignedTransaction;

fn verify(
self,
requested_hashes: &RequestTxHashes,
peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions);
) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
}

impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
type Transaction = T;

fn verify(
self,
requested_hashes: &RequestTxHashes,
_peer_id: &PeerId,
) -> (VerificationOutcome, VerifiedPooledTransactions) {
) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
let mut verification_outcome = VerificationOutcome::Ok;

let Self { mut txns } = self;
Expand All @@ -1229,11 +1232,11 @@ impl VerifyPooledTransactionsResponse for UnverifiedPooledTransactions {
let mut tx_hashes_not_requested_count = 0;

txns.0.retain(|tx| {
if !requested_hashes.contains(tx.hash()) {
if !requested_hashes.contains(tx.tx_hash()) {
verification_outcome = VerificationOutcome::ReportPeer;

#[cfg(debug_assertions)]
tx_hashes_not_requested.push(*tx.hash());
tx_hashes_not_requested.push(*tx.tx_hash());
#[cfg(not(debug_assertions))]
{
tx_hashes_not_requested_count += 1;
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ use reth_network_p2p::{
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElement, RecoveredTx,
TransactionSigned,
transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned,
};
use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream;
Expand Down Expand Up @@ -1307,11 +1306,17 @@ where
//
// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
// `NetworkConfig::start_network`(reth_network::NetworkConfig)
impl<Pool> Future for TransactionsManager<Pool>
impl<Pool, N> Future for TransactionsManager<Pool, N>
where
Pool: TransactionPool + Unpin + 'static,
Pool::Transaction:
PoolTransaction<Consensus = TransactionSigned, Pooled: Into<PooledTransactionsElement>>,
N: NetworkPrimitives<
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{
type Output = ();

Expand Down
Loading