Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: segregate command and query for announce request #792

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions packages/torrent-repository/benches/helpers/asyn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ where

let info_hash = InfoHash([0; 20]);

torrent_repository
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await;

torrent_repository.get_swarm_metadata(&info_hash).await;
}

start.elapsed()
Expand All @@ -37,19 +37,19 @@ where
let handles = FuturesUnordered::new();

// Add the torrent/peer to the torrent repository
torrent_repository
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;

torrent_repository.get_swarm_metadata(info_hash).await;

let start = Instant::now();

for _ in 0..samples {
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER).await;

torrent_repository_clone.get_swarm_metadata(info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -87,9 +87,9 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;

torrent_repository_clone.get_swarm_metadata(&info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -123,9 +123,8 @@ where

// Add the torrents/peers to the torrent repository
for info_hash in &info_hashes {
torrent_repository
.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER)
.await;
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await;
torrent_repository.get_swarm_metadata(info_hash).await;
}

let start = Instant::now();
Expand All @@ -134,9 +133,8 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone
.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER)
.await;
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await;
torrent_repository_clone.get_swarm_metadata(&info_hash).await;

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down
22 changes: 16 additions & 6 deletions packages/torrent-repository/benches/helpers/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ where

let info_hash = InfoHash([0; 20]);

torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER);

torrent_repository.get_swarm_metadata(&info_hash);
}

start.elapsed()
Expand All @@ -37,15 +39,19 @@ where
let handles = FuturesUnordered::new();

// Add the torrent/peer to the torrent repository
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);

torrent_repository.get_swarm_metadata(info_hash);

let start = Instant::now();

for _ in 0..samples {
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER);

torrent_repository_clone.get_swarm_metadata(info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -83,7 +89,9 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);

torrent_repository_clone.get_swarm_metadata(&info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down Expand Up @@ -117,7 +125,8 @@ where

// Add the torrents/peers to the torrent repository
for info_hash in &info_hashes {
torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER);
torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER);
torrent_repository.get_swarm_metadata(info_hash);
}

let start = Instant::now();
Expand All @@ -126,7 +135,8 @@ where
let torrent_repository_clone = torrent_repository.clone();

let handle = runtime.spawn(async move {
torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER);
torrent_repository_clone.get_swarm_metadata(&info_hash);

if let Some(sleep_time) = sleep {
let start_time = std::time::Instant::now();
Expand Down
20 changes: 6 additions & 14 deletions packages/torrent-repository/src/entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub trait Entry {
/// It returns the swarm metadata (statistics) as a struct:
///
/// `(seeders, completed, leechers)`
fn get_stats(&self) -> SwarmMetadata;
fn get_swarm_metadata(&self) -> SwarmMetadata;

/// Returns True if Still a Valid Entry according to the Tracker Policy
fn is_good(&self, policy: &TrackerPolicy) -> bool;
Expand All @@ -40,31 +40,27 @@ pub trait Entry {
///
/// The number of peers that have complete downloading is synchronously updated when peers are updated.
/// That's the total torrent downloads counter.
fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool;

// It preforms a combined operation of `insert_or_update_peer` and `get_stats`.
fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool;

/// It removes peer from the swarm that have not been updated for more than `current_cutoff` seconds
fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch);
}

#[allow(clippy::module_name_repetitions)]
pub trait EntrySync {
fn get_stats(&self) -> SwarmMetadata;
fn get_swarm_metadata(&self) -> SwarmMetadata;
fn is_good(&self, policy: &TrackerPolicy) -> bool;
fn peers_is_empty(&self) -> bool;
fn get_peers_len(&self) -> usize;
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&self, peer: &peer::Peer) -> bool;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
}

#[allow(clippy::module_name_repetitions)]
pub trait EntryAsync {
fn get_stats(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn get_swarm_metadata(&self) -> impl std::future::Future<Output = SwarmMetadata> + Send;
fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future<Output = bool> + Send;
fn peers_is_empty(&self) -> impl std::future::Future<Output = bool> + Send;
fn get_peers_len(&self) -> impl std::future::Future<Output = usize> + Send;
Expand All @@ -74,11 +70,7 @@ pub trait EntryAsync {
client: &SocketAddr,
limit: Option<usize>,
) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
fn insert_or_update_peer_and_get_stats(
self,
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + std::marker::Send;
fn upsert_peer(self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
}

Expand Down
14 changes: 4 additions & 10 deletions packages/torrent-repository/src/entry/mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use super::{Entry, EntrySync};
use crate::{EntryMutexStd, EntrySingle};

impl EntrySync for EntryMutexStd {
fn get_stats(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_stats()
fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().expect("it should get a lock").get_swarm_metadata()
}

fn is_good(&self, policy: &TrackerPolicy) -> bool {
Expand All @@ -33,14 +33,8 @@ impl EntrySync for EntryMutexStd {
self.lock().expect("it should get lock").get_peers_for_client(client, limit)
}

fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool {
self.lock().expect("it should lock the entry").insert_or_update_peer(peer)
}

fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
self.lock()
.expect("it should lock the entry")
.insert_or_update_peer_and_get_stats(peer)
fn upsert_peer(&self, peer: &peer::Peer) -> bool {
self.lock().expect("it should lock the entry").upsert_peer(peer)
}

fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
Expand Down
12 changes: 4 additions & 8 deletions packages/torrent-repository/src/entry/mutex_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use super::{Entry, EntryAsync};
use crate::{EntryMutexTokio, EntrySingle};

impl EntryAsync for EntryMutexTokio {
async fn get_stats(&self) -> SwarmMetadata {
self.lock().await.get_stats()
async fn get_swarm_metadata(&self) -> SwarmMetadata {
self.lock().await.get_swarm_metadata()
}

async fn check_good(self, policy: &TrackerPolicy) -> bool {
Expand All @@ -33,12 +33,8 @@ impl EntryAsync for EntryMutexTokio {
self.lock().await.get_peers_for_client(client, limit)
}

async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool {
self.lock().await.insert_or_update_peer(peer)
}

async fn insert_or_update_peer_and_get_stats(self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
self.lock().await.insert_or_update_peer_and_get_stats(peer)
async fn upsert_peer(self, peer: &peer::Peer) -> bool {
self.lock().await.upsert_peer(peer)
}

async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) {
Expand Down
10 changes: 2 additions & 8 deletions packages/torrent-repository/src/entry/single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::EntrySingle;

impl Entry for EntrySingle {
#[allow(clippy::cast_possible_truncation)]
fn get_stats(&self) -> SwarmMetadata {
fn get_swarm_metadata(&self) -> SwarmMetadata {
let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32;
let incomplete: u32 = self.peers.len() as u32 - complete;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Entry for EntrySingle {
}
}

fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
fn upsert_peer(&mut self, peer: &peer::Peer) -> bool {
let mut downloaded_stats_updated: bool = false;

match peer::ReadInfo::get_event(peer) {
Expand All @@ -93,12 +93,6 @@ impl Entry for EntrySingle {
downloaded_stats_updated
}

fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
let changed = self.insert_or_update_peer(peer);
let stats = self.get_stats();
(changed, stats)
}

fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
self.peers
.retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff);
Expand Down
16 changes: 9 additions & 7 deletions packages/torrent-repository/src/repository/dash_map_mutex_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ where
EntryMutexStd: EntrySync,
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
if let Some(entry) = self.torrents.get(info_hash) {
entry.insert_or_update_peer_and_get_stats(peer)
entry.upsert_peer(peer);
} else {
let _unused = self.torrents.insert(*info_hash, Arc::default());

match self.torrents.get(info_hash) {
Some(entry) => entry.insert_or_update_peer_and_get_stats(peer),
None => (false, SwarmMetadata::zeroed()),
if let Some(entry) = self.torrents.get(info_hash) {
entry.upsert_peer(peer);
}
}
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
let maybe_entry = self.torrents.get(key);
maybe_entry.map(|entry| entry.clone())
Expand All @@ -45,7 +47,7 @@ where
let mut metrics = TorrentsMetrics::default();

for entry in &self.torrents {
let stats = entry.value().lock().expect("it should get a lock").get_stats();
let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
Expand Down
10 changes: 4 additions & 6 deletions packages/torrent-repository/src/repository/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ pub trait Repository<T>: Debug + Default + Sized + 'static {
fn remove(&self, key: &InfoHash) -> Option<T>;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch);
fn remove_peerless_torrents(&self, policy: &TrackerPolicy);
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata);
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer);
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata>;
}

#[allow(clippy::module_name_repetitions)]
Expand All @@ -36,9 +37,6 @@ pub trait RepositoryAsync<T>: Debug + Default + Sized + 'static {
fn remove(&self, key: &InfoHash) -> impl std::future::Future<Output = Option<T>> + Send;
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future<Output = ()> + Send;
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future<Output = ()> + Send;
fn update_torrent_with_peer_and_get_stats(
&self,
info_hash: &InfoHash,
peer: &peer::Peer,
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future<Output = ()> + Send;
fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future<Output = Option<SwarmMetadata>> + Send;
}
10 changes: 7 additions & 3 deletions packages/torrent-repository/src/repository/rw_lock_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ impl Repository<EntrySingle> for TorrentsRwLockStd
where
EntrySingle: Entry,
{
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) {
let mut db = self.get_torrents_mut();

let entry = db.entry(*info_hash).or_insert(EntrySingle::default());

entry.insert_or_update_peer_and_get_stats(peer)
entry.upsert_peer(peer);
}

fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option<SwarmMetadata> {
self.get(info_hash).map(|entry| entry.get_swarm_metadata())
}

fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
Expand All @@ -64,7 +68,7 @@ where
let mut metrics = TorrentsMetrics::default();

for entry in self.get_torrents().values() {
let stats = entry.get_stats();
let stats = entry.get_swarm_metadata();
metrics.complete += u64::from(stats.complete);
metrics.downloaded += u64::from(stats.downloaded);
metrics.incomplete += u64::from(stats.incomplete);
Expand Down
Loading
Loading