diff --git a/bin/reth/src/cli/components.rs b/bin/reth/src/cli/components.rs index 2f27eb508145..8f45774688f7 100644 --- a/bin/reth/src/cli/components.rs +++ b/bin/reth/src/cli/components.rs @@ -1,5 +1,6 @@ //! Components that are used by the node command. +use reth_network::NetworkEvents; use reth_network_api::{NetworkInfo, Peers}; use reth_primitives::ChainSpec; use reth_provider::{ @@ -47,7 +48,7 @@ pub trait RethNodeComponents: Clone + Send + Sync + 'static { /// The transaction pool type type Pool: TransactionPool + Clone + Unpin + 'static; /// The network type used to communicate with p2p. - type Network: NetworkInfo + Peers + Clone + 'static; + type Network: NetworkInfo + Peers + NetworkEvents + Clone + 'static; /// The events type used to create subscriptions. type Events: CanonStateSubscriptions + Clone + 'static; /// The type that is used to spawn tasks. @@ -116,7 +117,7 @@ where Provider: FullProvider + Clone + 'static, Tasks: TaskSpawner + Clone + Unpin + 'static, Pool: TransactionPool + Clone + Unpin + 'static, - Network: NetworkInfo + Peers + Clone + 'static, + Network: NetworkInfo + Peers + NetworkEvents + Clone + 'static, Events: CanonStateSubscriptions + Clone + 'static, { type Provider = Provider; diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 49a58d81cb48..e35cd6df46ed 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -20,7 +20,7 @@ use reth_interfaces::{ consensus::Consensus, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, }; -use reth_network::NetworkHandle; +use reth_network::{NetworkEvents, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256}; use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader}; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 56508dc77447..2a54edd0bae7 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -49,7 +49,9 @@ use reth_interfaces::{ }, RethResult, }; -use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; +use reth_network::{ + error::NetworkError, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager, +}; use reth_network_api::{NetworkInfo, PeersInfo}; use reth_primitives::{ constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP}, diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 18e0e5fd6e73..6767a6d42b47 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -140,7 +140,7 @@ pub use discovery::{Discovery, DiscoveryEvent}; pub use fetch::FetchClient; pub use manager::{NetworkEvent, NetworkManager}; pub use message::PeerRequest; -pub use network::NetworkHandle; +pub use network::{NetworkEvents, NetworkHandle}; pub use peers::PeersConfig; pub use session::{ ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent, diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index eafe29b603d0..18a94dde83ba 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -83,22 +83,6 @@ impl NetworkHandle { &self.inner.to_manager_tx } - /// Creates a new [`NetworkEvent`] listener channel. - pub fn event_listener(&self) -> UnboundedReceiverStream { - let (tx, rx) = mpsc::unbounded_channel(); - let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); - UnboundedReceiverStream::new(rx) - } - - /// Returns a new [`DiscoveryEvent`] stream. - /// - /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. - pub fn discovery_listener(&self) -> UnboundedReceiverStream { - let (tx, rx) = mpsc::unbounded_channel(); - let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx)); - UnboundedReceiverStream::new(rx) - } - /// Returns a new [`FetchClient`] that can be cloned and shared. /// /// The [`FetchClient`] is the entrypoint for sending requests to the network. @@ -183,6 +167,20 @@ impl NetworkHandle { } } +impl NetworkEvents for NetworkHandle { + fn event_listener(&self) -> UnboundedReceiverStream { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.manager().send(NetworkHandleMessage::EventListener(tx)); + UnboundedReceiverStream::new(rx) + } + + fn discovery_listener(&self) -> UnboundedReceiverStream { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx)); + UnboundedReceiverStream::new(rx) + } +} + // === API Implementations === impl PeersInfo for NetworkHandle { @@ -339,6 +337,16 @@ struct NetworkInner { sequencer_endpoint: Option, } +/// Provides event subscription for the network. +pub trait NetworkEvents: Send + Sync { + /// Creates a new [`NetworkEvent`] listener channel. + fn event_listener(&self) -> UnboundedReceiverStream; + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + fn discovery_listener(&self) -> UnboundedReceiverStream; +} + /// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager). #[allow(missing_docs)] #[derive(Debug)] diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 7bc7e739e15d..2b011346de9a 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -5,7 +5,8 @@ use crate::{ error::NetworkError, eth_requests::EthRequestHandler, transactions::{TransactionsHandle, TransactionsManager}, - NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager, + NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle, + NetworkManager, }; use futures::{FutureExt, StreamExt}; use pin_project::pin_project; diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 9a98509d5bd9..7b35e846890f 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -5,7 +5,7 @@ use crate::{ manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, - NetworkHandle, + NetworkEvents, NetworkHandle, }; use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt}; use reth_eth_wire::{ diff --git a/crates/net/network/tests/it/big_pooled_txs_req.rs b/crates/net/network/tests/it/big_pooled_txs_req.rs index 81fd8b4a70e1..4f09d103a65d 100644 --- a/crates/net/network/tests/it/big_pooled_txs_req.rs +++ b/crates/net/network/tests/it/big_pooled_txs_req.rs @@ -2,7 +2,7 @@ use reth_eth_wire::{GetPooledTransactions, PooledTransactions}; use reth_interfaces::sync::{NetworkSyncUpdater, SyncState}; use reth_network::{ test_utils::{NetworkEventStream, Testnet}, - PeerRequest, + NetworkEvents, PeerRequest, }; use reth_network_api::{NetworkInfo, Peers}; use reth_primitives::{Signature, TransactionSigned, B256}; diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 1ad7ae27bb52..f6427f170361 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -12,7 +12,7 @@ use reth_interfaces::{ use reth_net_common::ban_list::BanList; use reth_network::{ test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT}, - NetworkConfigBuilder, NetworkEvent, NetworkManager, PeersConfig, + NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, PeersConfig, }; use reth_network_api::{NetworkInfo, Peers, PeersInfo}; use reth_primitives::{mainnet_nodes, HeadersDirection, NodeRecord, PeerId}; diff --git a/crates/net/network/tests/it/geth.rs b/crates/net/network/tests/it/geth.rs index 83fefb9bc082..473b1bef49f4 100644 --- a/crates/net/network/tests/it/geth.rs +++ b/crates/net/network/tests/it/geth.rs @@ -6,7 +6,7 @@ use ethers_core::{ use ethers_providers::Middleware; use reth_network::{ test_utils::{unused_tcp_and_udp_port, unused_tcp_udp, NetworkEventStream}, - NetworkConfig, NetworkManager, + NetworkConfig, NetworkEvents, NetworkManager, }; use reth_network_api::Peers; use reth_primitives::{ChainSpec, Genesis, PeerId, SealedHeader}; diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs index cfe7df275ddb..3ea46d189a44 100644 --- a/crates/net/network/tests/it/requests.rs +++ b/crates/net/network/tests/it/requests.rs @@ -5,7 +5,10 @@ use reth_interfaces::p2p::{ bodies::client::BodiesClient, headers::client::{HeadersClient, HeadersRequest}, }; -use reth_network::test_utils::{NetworkEventStream, Testnet}; +use reth_network::{ + test_utils::{NetworkEventStream, Testnet}, + NetworkEvents, +}; use reth_network_api::{NetworkInfo, Peers}; use reth_primitives::{ Block, BlockBody, Bytes, Header, HeadersDirection, Signature, Transaction, TransactionKind, diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 3fceb65fbb5e..c00036b08d42 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -4,7 +4,7 @@ use futures::StreamExt; use reth_eth_wire::{capability::Capability, EthVersion}; use reth_network::{ test_utils::{PeerConfig, Testnet}, - NetworkEvent, + NetworkEvent, NetworkEvents, }; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::test_utils::NoopProvider;