Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add trait forNetworkEvents #5354

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/reth/src/cli/components.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Components that are used by the node command.

use reth_network::NetworkEvents;
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::ChainSpec;
use reth_provider::{
Expand Down Expand Up @@ -47,7 +48,7 @@ pub trait RethNodeComponents: Clone + Send + Sync + 'static {
/// The transaction pool type
type Pool: TransactionPool + Clone + Unpin + 'static;
/// The network type used to communicate with p2p.
type Network: NetworkInfo + Peers + Clone + 'static;
type Network: NetworkInfo + Peers + NetworkEvents + Clone + 'static;
/// The events type used to create subscriptions.
type Events: CanonStateSubscriptions + Clone + 'static;
/// The type that is used to spawn tasks.
Expand Down Expand Up @@ -116,7 +117,7 @@ where
Provider: FullProvider + Clone + 'static,
Tasks: TaskSpawner + Clone + Unpin + 'static,
Pool: TransactionPool + Clone + Unpin + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Network: NetworkInfo + Peers + NetworkEvents + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
type Provider = Provider;
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_interfaces::{
consensus::Consensus,
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
};
use reth_network::NetworkHandle;
use reth_network::{NetworkEvents, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256};
use reth_provider::{BlockExecutionWriter, ProviderFactory, StageCheckpointReader};
Expand Down
4 changes: 3 additions & 1 deletion bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use reth_interfaces::{
},
RethResult,
};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvents, NetworkHandle, NetworkManager,
};
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_primitives::{
constants::eip4844::{LoadKzgSettingsError, MAINNET_KZG_TRUSTED_SETUP},
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub use discovery::{Discovery, DiscoveryEvent};
pub use fetch::FetchClient;
pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::NetworkHandle;
pub use network::{NetworkEvents, NetworkHandle};
pub use peers::PeersConfig;
pub use session::{
ActiveSessionHandle, ActiveSessionMessage, Direction, PeerInfo, PendingSessionEvent,
Expand Down
40 changes: 24 additions & 16 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,6 @@ impl NetworkHandle {
&self.inner.to_manager_tx
}

/// Creates a new [`NetworkEvent`] listener channel.
pub fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
}

/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
pub fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
UnboundedReceiverStream::new(rx)
}

/// Returns a new [`FetchClient`] that can be cloned and shared.
///
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
Expand Down Expand Up @@ -183,6 +167,20 @@ impl NetworkHandle {
}
}

impl NetworkEvents for NetworkHandle {
fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
}

fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
UnboundedReceiverStream::new(rx)
}
}

// === API Implementations ===

impl PeersInfo for NetworkHandle {
Expand Down Expand Up @@ -339,6 +337,16 @@ struct NetworkInner {
sequencer_endpoint: Option<String>,
}

/// Provides event subscription for the network.
pub trait NetworkEvents: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> UnboundedReceiverStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}

/// Internal messages that can be passed to the [`NetworkManager`](crate::NetworkManager).
#[allow(missing_docs)]
#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::{
error::NetworkError,
eth_requests::EthRequestHandler,
transactions::{TransactionsHandle, TransactionsManager},
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager,
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkHandle,
NetworkManager,
};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
NetworkEvents, NetworkHandle,
};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
use reth_eth_wire::{
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/big_pooled_txs_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use reth_eth_wire::{GetPooledTransactions, PooledTransactions};
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState};
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
PeerRequest,
NetworkEvents, PeerRequest,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::{Signature, TransactionSigned, B256};
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_interfaces::{
use reth_net_common::ban_list::BanList;
use reth_network::{
test_utils::{enr_to_peer_id, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT},
NetworkConfigBuilder, NetworkEvent, NetworkManager, PeersConfig,
NetworkConfigBuilder, NetworkEvent, NetworkEvents, NetworkManager, PeersConfig,
};
use reth_network_api::{NetworkInfo, Peers, PeersInfo};
use reth_primitives::{mainnet_nodes, HeadersDirection, NodeRecord, PeerId};
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use ethers_core::{
use ethers_providers::Middleware;
use reth_network::{
test_utils::{unused_tcp_and_udp_port, unused_tcp_udp, NetworkEventStream},
NetworkConfig, NetworkManager,
NetworkConfig, NetworkEvents, NetworkManager,
};
use reth_network_api::Peers;
use reth_primitives::{ChainSpec, Genesis, PeerId, SealedHeader};
Expand Down
5 changes: 4 additions & 1 deletion crates/net/network/tests/it/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use reth_interfaces::p2p::{
bodies::client::BodiesClient,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_network::test_utils::{NetworkEventStream, Testnet};
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
NetworkEvents,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::{
Block, BlockBody, Bytes, Header, HeadersDirection, Signature, Transaction, TransactionKind,
Expand Down
2 changes: 1 addition & 1 deletion crates/net/network/tests/it/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::StreamExt;
use reth_eth_wire::{capability::Capability, EthVersion};
use reth_network::{
test_utils::{PeerConfig, Testnet},
NetworkEvent,
NetworkEvent, NetworkEvents,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::test_utils::NoopProvider;
Expand Down