Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement of relay related metrics; remove relay clients if we have no more connections #2767

Merged
merged 6 commits into from
Feb 21, 2025
13 changes: 3 additions & 10 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,8 @@ impl SwarmDriver {
sender,
} => {
cmd_string = "GetLocalQuotingMetrics";
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);
let (quoting_metrics, is_already_stored) = match self
.swarm
.behaviour_mut()
Expand All @@ -644,7 +637,7 @@ impl SwarmDriver {
&key,
data_type,
data_size,
Some(estimated_network_size as u64),
Some(kbucket_status.estimated_network_size as u64),
) {
Ok(res) => res,
Err(err) => {
Expand Down
20 changes: 8 additions & 12 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,24 +841,20 @@ impl SwarmDriver {
}
_ = set_farthest_record_interval.tick() => {
if !self.is_client {
let (
_index,
_total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
_kbucket_table_stats,
) = self.kbuckets_status();
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
if estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {estimated_network_size}, with {peers_in_non_full_buckets} peers_in_non_full_buckets and {num_of_full_buckets}num_of_full_buckets.");
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);
if kbucket_status.estimated_network_size <= CLOSE_GROUP_SIZE {
info!("Not enough estimated network size {}, with {} peers_in_non_full_buckets and {} num_of_full_buckets.",
kbucket_status.estimated_network_size,
kbucket_status.peers_in_non_full_buckets,
kbucket_status.num_of_full_buckets);
continue;
}
// The entire Distance space is U256
// (U256::MAX is 115792089237316195423570985008687907853269984665640564039457584007913129639935)
// The network density (average distance among nodes) can be estimated as:
// network_density = entire_U256_space / estimated_network_size
let density = U256::MAX / U256::from(estimated_network_size);
let density = U256::MAX / U256::from(kbucket_status.estimated_network_size);
let density_distance = density * U256::from(CLOSE_GROUP_SIZE);

// Use distance to close peer to avoid the situation that
Expand Down
2 changes: 1 addition & 1 deletion ant-networking/src/event/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl SwarmDriver {
.collect(),
};

let is_relayed_peer = is_a_relayed_peer(&addrs);
let is_relayed_peer = is_a_relayed_peer(addrs.iter());

let is_bootstrap_peer = self
.bootstrap_peers
Expand Down
148 changes: 90 additions & 58 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod kad;
mod request_response;
mod swarm;

use crate::{driver::SwarmDriver, error::Result};
use crate::{driver::SwarmDriver, error::Result, relay_manager::is_a_relayed_peer};
use core::fmt;
use custom_debug::Debug as CustomDebug;
use libp2p::{
Expand All @@ -37,8 +37,32 @@ use std::{
};
use tokio::sync::oneshot;

// (total_buckets, total_peers, peers_in_non_full_buckets, num_of_full_buckets, kbucket_table_stats)
type KBucketStatus = (usize, usize, usize, usize, Vec<(usize, usize, u32)>);
#[derive(Debug, Clone)]
pub(crate) struct KBucketStatus {
pub(crate) total_buckets: usize,
pub(crate) total_peers: usize,
pub(crate) total_relay_peers: usize,
pub(crate) peers_in_non_full_buckets: usize,
pub(crate) relay_peers_in_non_full_buckets: usize,
pub(crate) num_of_full_buckets: usize,
pub(crate) kbucket_table_stats: Vec<(usize, usize, u32)>,
pub(crate) estimated_network_size: usize,
}

impl KBucketStatus {
pub(crate) fn log(&self) {
info!(
"kBucketTable has {:?} kbuckets {:?} peers ({} relay peers), {:?}, estimated network size: {:?}",
self.total_buckets,
self.total_peers,
self.total_relay_peers,
self.kbucket_table_stats,
self.estimated_network_size
);
#[cfg(feature = "loud")]
println!("Estimated network size: {:?}", self.estimated_network_size);
}
}

/// NodeEvent enum
#[derive(CustomDebug)]
Expand Down Expand Up @@ -239,76 +263,86 @@ impl SwarmDriver {

/// Update state on addition of a peer to the routing table.
pub(crate) fn update_on_peer_addition(&mut self, added_peer: PeerId, addresses: Addresses) {
self.peers_in_rt = self.peers_in_rt.saturating_add(1);
let n_peers = self.peers_in_rt;
info!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);

let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(added_peer));
info!("New peer added to routing table: {added_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
self.peers_in_rt, distance.ilog2());

#[cfg(feature = "loud")]
println!("New peer added to routing table: {added_peer:?}, now we have #{n_peers} connected peers");
println!(
"New peer added to routing table: {added_peer:?}, now we have #{} connected peers",
self.peers_in_rt
);

kbucket_status.log();

if let Some(bootstrap_cache) = &mut self.bootstrap_cache {
for addr in addresses.iter() {
bootstrap_cache.add_addr(addr.clone());
}
}

self.log_kbuckets(&added_peer);
self.send_event(NetworkEvent::PeerAdded(added_peer, self.peers_in_rt));

#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(self.peers_in_rt as i64);
}
}

/// Update state on removal of a peer from the routing table.
pub(crate) fn update_on_peer_removal(&mut self, removed_peer: PeerId) {
self.peers_in_rt = self.peers_in_rt.saturating_sub(1);
let kbucket_status = self.get_kbuckets_status();
self.update_on_kbucket_status(&kbucket_status);

// ensure we disconnect bad peer
// err result just means no connections were open
let _result = self.swarm.disconnect_peer_id(removed_peer);

let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(removed_peer));
info!(
"Peer removed from routing table: {removed_peer:?}, now we have #{} connected peers",
self.peers_in_rt
"Peer removed from routing table: {removed_peer:?}. We now have #{} connected peers. It has a {:?} distance to us.",
self.peers_in_rt, distance.ilog2()
);
self.log_kbuckets(&removed_peer);

self.send_event(NetworkEvent::PeerRemoved(removed_peer, self.peers_in_rt));

kbucket_status.log();

#[cfg(feature = "open-metrics")]
if self.metrics_recorder.is_some() {
self.check_for_change_in_our_close_group();
}

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(self.peers_in_rt as i64);
}
}

/// Collect kbuckets status
pub(crate) fn kbuckets_status(&mut self) -> KBucketStatus {
/// Get the status of the kbucket table.
pub(crate) fn get_kbuckets_status(&mut self) -> KBucketStatus {
let mut kbucket_table_stats = vec![];
let mut index = 0;
let mut total_peers = 0;
let mut total_relay_peers = 0;

let mut peers_in_non_full_buckets = 0;
let mut relay_peers_in_non_full_buckets = 0;
let mut num_of_full_buckets = 0;

for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let range = kbucket.range();
let num_entires = kbucket.num_entries();

kbucket.iter().for_each(|entry| {
if is_a_relayed_peer(entry.node.value.iter()) {
total_relay_peers += 1;
if num_entires < K_VALUE.get() {
relay_peers_in_non_full_buckets += 1;
}
}
});

if num_entires >= K_VALUE.get() {
num_of_full_buckets += 1;
} else {
Expand All @@ -324,51 +358,49 @@ impl SwarmDriver {
}
index += 1;
}
(
index,
total_peers,
peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
)
}

/// Logs the kbuckets also records the bucket info.
pub(crate) fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());
let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);

let (
index,
KBucketStatus {
total_buckets: index,
total_peers,
total_relay_peers,
peers_in_non_full_buckets,
relay_peers_in_non_full_buckets,
num_of_full_buckets,
kbucket_table_stats,
) = self.kbuckets_status();
estimated_network_size,
}
}

let estimated_network_size =
Self::estimate_network_size(peers_in_non_full_buckets, num_of_full_buckets);
/// Update SwarmDriver field & also record metrics based on the newly calculated kbucket status.
pub(crate) fn update_on_kbucket_status(&mut self, status: &KBucketStatus) {
self.peers_in_rt = status.total_peers;
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.peers_in_routing_table
.set(status.total_peers as i64);

let _ = metrics_recorder
.relay_peers_in_routing_table
.set(status.total_relay_peers as i64);

let estimated_network_size = Self::estimate_network_size(
status.peers_in_non_full_buckets,
status.num_of_full_buckets,
);
let _ = metrics_recorder
.estimated_network_size
.set(estimated_network_size as i64);
}

// Just to warn if our tracking goes out of sync with libp2p. Can happen if someone forgets to call
// update_on_peer_addition or update_on_peer_removal when adding or removing a peer.
// Only log every 10th peer to avoid spamming the logs.
if total_peers % 10 == 0 && total_peers != self.peers_in_rt {
warn!(
"Total peers in routing table: {}, but kbucket table has {total_peers} peers",
self.peers_in_rt
let _ = metrics_recorder.relay_peers_percentage.set(
(status.relay_peers_in_non_full_buckets as f64
/ status.peers_in_non_full_buckets as f64)
* 100.0,
);
}

info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}, estimated network size: {estimated_network_size:?}");
#[cfg(feature = "loud")]
println!("Estimated network size: {estimated_network_size:?}");
}

/// Estimate the number of nodes in the network
Expand Down
34 changes: 31 additions & 3 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,25 @@ impl SwarmDriver {
renewed: _,
} => {
self.connected_relay_clients.insert(src_peer_id);
info!("Relay reservation accepted from {src_peer_id:?}. Relay client count: {}", self.connected_relay_clients.len());

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}
libp2p::relay::Event::ReservationTimedOut { src_peer_id } => {
self.connected_relay_clients.remove(&src_peer_id);
info!("Relay reservation timed out from {src_peer_id:?}. Relay client count: {}", self.connected_relay_clients.len());

#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}
_ => {}
}
Expand Down Expand Up @@ -264,6 +280,14 @@ impl SwarmDriver {
event_string = "ConnectionClosed";
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);

if num_established == 0 && self.connected_relay_clients.remove(&peer_id) {
info!(
"Relay client has been disconnected: {peer_id:?}. Relay client count: {}",
self.connected_relay_clients.len()
);
}

self.record_connection_metrics();
}
SwarmEvent::OutgoingConnectionError {
Expand Down Expand Up @@ -594,13 +618,17 @@ impl SwarmDriver {
/// Record the metrics on update of connection state.
fn record_connection_metrics(&self) {
#[cfg(feature = "open-metrics")]
if let Some(metrics) = &self.metrics_recorder {
metrics
if let Some(metrics_recorder) = &self.metrics_recorder {
metrics_recorder
.open_connections
.set(self.live_connected_peers.len() as i64);
metrics
metrics_recorder
.connected_peers
.set(self.swarm.connected_peers().count() as i64);

metrics_recorder
.connected_relay_clients
.set(self.connected_relay_clients.len() as i64);
}
}

Expand Down
Loading
Loading