Skip to content

Commit

Permalink
Merge pull request #2767 from RolandSherwin/relay_met
Browse files Browse the repository at this point in the history
improvement of relay related metrics; remove relay clients if we have no more connections
  • Loading branch information
jacderida authored Feb 21, 2025
2 parents 943a94d + 97aa081 commit 74ea969
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 90 deletions.
13 changes: 3 additions & 10 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,8 @@ impl SwarmDriver {
sender,
} => {
cmd_string = "GetLocalQuotingMetrics";
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);
let (quoting_metrics, is_already_stored) = match self
.swarm
.behaviour_mut()
Expand All @@ -644,7 +637,7 @@ impl SwarmDriver {
&key,
data_type,
data_size,
Some(estimated_network_size as u64),
Some(kbucket_status.estimated_network_size as u64),
) {
Ok(res) => res,
Err(err) => {
Expand Down
20 changes: 8 additions & 12 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,24 +841,20 @@ impl SwarmDriver {
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
if estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {estimated_network_size}, with {peers_in_non_full_buckets} peers_in_non_full_buckets and {num_of_full_buckets}num_of_full_buckets.");
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);
if kbucket_status.estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {}, with {} peers_in_non_full_buckets and {} num_of_full_buckets.",
kbucket_status.estimated_network_size,
kbucket_status.peers_in_non_full_buckets,
kbucket_status.num_of_full_buckets);
continue;
}
// The entire Distance space is U256
// (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let density = U256::MAX / U256::from(kbucket_status.estimated_network_size);
let density_distance = density * U256::from(CLOSE_GROUP_SIZE);

// Use distance to close peer to avoid the situation that
Expand Down
2 changes: 1 addition & 1 deletion ant-networking/src/event/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl SwarmDriver {
.collect(),
};

let is_relayed_peer = is_a_relayed_peer(&addrs);
let is_relayed_peer = is_a_relayed_peer(addrs.iter());

let is_bootstrap_peer = self
.bootstrap_peers
Expand Down
148 changes: 90 additions & 58 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod kad;
mod request_response;
mod swarm;

use crate::{driver::SwarmDriver, error::Result};
use crate::{driver::SwarmDriver, error::Result, relay_manager::is_a_relayed_peer};
use core::fmt;
use custom_debug::Debug as CustomDebug;
use libp2p::{
Expand All @@ -37,8 +37,32 @@ use std::{
};
use tokio::sync::oneshot;

// (total_buckets, total_peers, peers_in_non_full_buckets, num_of_full_buckets, kbucket_table_stats)
type KBucketStatus = (usize, usize, usize, usize, Vec<(usize, usize, u32)>);
#[derive(Debug, Clone)]
pub(crate) struct KBucketStatus {
pub(crate) total_buckets: usize,
pub(crate) total_peers: usize,
pub(crate) total_relay_peers: usize,
pub(crate) peers_in_non_full_buckets: usize,
pub(crate) relay_peers_in_non_full_buckets: usize,
pub(crate) num_of_full_buckets: usize,
pub(crate) kbucket_table_stats: Vec<(usize, usize, u32)>,
pub(crate) estimated_network_size: usize,
}

impl KBucketStatus {
pub(crate) fn log(&self) {
info!(
"kBucketTable has {:?} kbuckets {:?} peers ({} relay peers), {:?}, estimated network size: {:?}",
self.total_buckets,
self.total_peers,
self.total_relay_peers,
self.kbucket_table_stats,
self.estimated_network_size
);
#[cfg(feature = "loud")]
println!("Estimated network size: {:?}", self.estimated_network_size);
}
}

/// NodeEvent enum
#[derive(CustomDebug)]
Expand Down Expand Up @@ -239,76 +263,86 @@ impl SwarmDriver {

/// Update state on addition of a peer to the routing table.
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
self.peers_in_rt = self.peers_in_rt.saturating_add(1);
let n_peers = self.peers_in_rt;
info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);

let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(added_peer));
info!("New peer added to routing table: {added_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
self.peers_in_rt, distance.ilog2());

#[cfg(feature = "loud")]
println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");
println!(
"New peer added to routing table: {added_peer:?}, now we have #{} connected peers",
self.peers_in_rt
);

kbucket_status.log();

if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
for addr in addresses.iter() {
bootstrap_cache.add_addr(addr.clone());
}
}

self.log_kbuckets(&added_peer);
self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));

#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(self.peers_in_rt as i64);
}
}

/// Update state on removal of a peer from the routing table.
pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) {
self.peers_in_rt = self.peers_in_rt.saturating_sub(1);
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);

// ensure we disconnect bad peer
// err result just means no connections were open
let _result = self.swarm.disconnect_peer_id(removed_peer);

let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(removed_peer));
info!(
"Peer removed from routing table: {removed_peer:?}, now we have #{} connected peers",
self.peers_in_rt
"Peer removed from routing table: {removed_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
self.peers_in_rt, distance.ilog2()
);
self.log_kbuckets(&removed_peer);

self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt));

kbucket_status.log();

#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(self.peers_in_rt as i64);
}
}

/// Collect kbuckets status
pub(crate) fn kbuckets_status(&mut self) -> KBucketStatus {
/// Get the status of the kbucket table.
pub(crate) fn get_kbuckets_status(&mut self) -> KBucketStatus {
let mut kbucket_table_stats = vec![];
let mut index = 0;
let mut total_peers = 0;
let mut total_relay_peers = 0;

let mut peers_in_non_full_buckets = 0;
let mut relay_peers_in_non_full_buckets = 0;
let mut num_of_full_buckets = 0;

for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let range = kbucket.range();
let num_entires = kbucket.num_entries();

kbucket.iter().for_each(|entry| {
if is_a_relayed_peer(entry.node.value.iter()) {
total_relay_peers += 1;
if num_entires < K_VALUE.get() {
relay_peers_in_non_full_buckets += 1;
}
}
});

if num_entires >= K_VALUE.get() {
num_of_full_buckets += 1;
} else {
Expand All @@ -324,51 +358,49 @@ impl SwarmDriver {
}
index += 1;
}
(
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
)
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);

let (
index,
KBucketStatus {
total_buckets: index,
total_peers,
total_relay_peers,
peers_in_non_full_buckets,
relay_peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
) = self.kbuckets_status();
estimated_network_size,
}
}

let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
/// Update SwarmDriver field & also record metrics based on the newly calculated kbucket status.
pub(crate) fn update_on_kbucket_status(&mut self, status: &KBucketStatus) {
self.peers_in_rt = status.total_peers;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(status.total_peers as i64);

let _ = metrics_recorder
.relay_peers_in_routing_table
.set(status.total_relay_peers as i64);

let estimated_network_size = Self::estimate_network_size(
status.peers_in_non_full_buckets,
status.num_of_full_buckets,
);
let _ = metrics_recorder
.estimated_network_size
.set(estimated_network_size as i64);
}

// Just to warn if our tracking goes out of sync with libp2p. Can happen if someone forgets to call
// update_on_peer_addition or update_on_peer_removal when adding or removing a peer.
// Only log every 10th peer to avoid spamming the logs.
if total_peers % 10 == 0 && total_peers != self.peers_in_rt {
warn!(
"Total peers in routing table: {}, but kbucket table has {total_peers} peers",
self.peers_in_rt
let _ = metrics_recorder.relay_peers_percentage.set(
(status.relay_peers_in_non_full_buckets as f64
/ status.peers_in_non_full_buckets as f64)
* 100.0,
);
}

info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}, estimated network size: {estimated_network_size:?}");
#[cfg(feature = "loud")]
println!("Estimated network size: {estimated_network_size:?}");
}

/// Estimate the number of nodes in the network
Expand Down
34 changes: 31 additions & 3 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,25 @@ impl SwarmDriver {
renewed: _,
} => {
self.connected_relay_clients.insert(src_peer_id);
info!("Relay reservation accepted from {src_peer_id:?}. Relay client count: {}", self.connected_relay_clients.len());

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}
libp2p::relay::Event::ReservationTimedOut { src_peer_id } => {
self.connected_relay_clients.remove(&src_peer_id);
info!("Relay reservation timed out from {src_peer_id:?}. Relay client count: {}", self.connected_relay_clients.len());

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}
_ => {}
}
Expand Down Expand Up @@ -264,6 +280,14 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);

if num_established == 0 && self.connected_relay_clients.remove(&peer_id) {
info!(
"Relay client has been disconnected: {peer_id:?}. Relay client count: {}",
self.connected_relay_clients.len()
);
}

self.record_connection_metrics();
}
SwarmEvent::OutgoingConnectionError {
Expand Down Expand Up @@ -594,13 +618,17 @@ impl SwarmDriver {
/// Record the metrics on update of connection state.
fn record_connection_metrics(&self) {
#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.metrics_recorder {
metrics
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.open_connections
.set(self.live_connected_peers.len() as i64);
metrics
metrics_recorder
.connected_peers
.set(self.swarm.connected_peers().count() as i64);

metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}

Expand Down
Loading

0 comments on commit 74ea969

Please sign in to comment.