Skip to content

Commit

Permalink
pool: replace tokio::sync::RwLock with std::sync::Mutex for `Rela…
Browse files Browse the repository at this point in the history
…y` subscriptions

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jan 28, 2025
1 parent 1a7cd2c commit 8236e93
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 134 deletions.
44 changes: 17 additions & 27 deletions bindings/nostr-sdk-ffi/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ impl Relay {
self.inner.document().await.into()
}

pub async fn subscriptions(&self) -> HashMap<String, Vec<Arc<Filter>>> {
pub fn subscriptions(&self) -> HashMap<String, Vec<Arc<Filter>>> {
self.inner
.subscriptions()
.await
.into_iter()
.map(|(id, filters)| {
(
Expand All @@ -180,11 +179,10 @@ impl Relay {
}

/// Get filters by subscription ID
pub async fn subscription(&self, id: String) -> Option<Vec<Arc<Filter>>> {
pub fn subscription(&self, id: String) -> Option<Vec<Arc<Filter>>> {
let id = SubscriptionId::new(id);
self.inner
.subscription(&id)
.await
.map(|f| f.into_iter().map(|f| Arc::new(f.into())).collect())
}

Expand Down Expand Up @@ -254,11 +252,7 @@ impl Relay {
/// It's possible to automatically close a subscription by configuring the `SubscribeOptions`.
///
/// Note: auto-closing subscriptions aren't saved in subscriptions map!
pub async fn subscribe(
&self,
filters: Vec<Arc<Filter>>,
opts: &SubscribeOptions,
) -> Result<String> {
pub fn subscribe(&self, filters: Vec<Arc<Filter>>, opts: &SubscribeOptions) -> Result<String> {
Ok(self
.inner
.subscribe(
Expand All @@ -267,8 +261,7 @@ impl Relay {
.map(|f| f.as_ref().deref().clone())
.collect(),
**opts,
)
.await?
)?
.to_string())
}

Expand All @@ -279,33 +272,30 @@ impl Relay {
/// It's possible to automatically close a subscription by configuring the `SubscribeOptions`.
///
/// Note: auto-closing subscriptions aren't saved in subscriptions map!
pub async fn subscribe_with_id(
pub fn subscribe_with_id(
&self,
id: String,
filters: Vec<Arc<Filter>>,
opts: &SubscribeOptions,
) -> Result<()> {
Ok(self
.inner
.subscribe_with_id(
SubscriptionId::new(id),
filters
.into_iter()
.map(|f| f.as_ref().deref().clone())
.collect(),
**opts,
)
.await?)
Ok(self.inner.subscribe_with_id(
SubscriptionId::new(id),
filters
.into_iter()
.map(|f| f.as_ref().deref().clone())
.collect(),
**opts,
)?)
}

/// Unsubscribe
pub async fn unsubscribe(&self, id: String) -> Result<()> {
Ok(self.inner.unsubscribe(SubscriptionId::new(id)).await?)
pub fn unsubscribe(&self, id: String) -> Result<()> {
Ok(self.inner.unsubscribe(SubscriptionId::new(id))?)
}

/// Unsubscribe from all subscriptions
pub async fn unsubscribe_all(&self) -> Result<()> {
Ok(self.inner.unsubscribe_all().await?)
pub fn unsubscribe_all(&self) -> Result<()> {
Ok(self.inner.unsubscribe_all()?)
}

/// Fetch events
Expand Down
17 changes: 5 additions & 12 deletions bindings/nostr-sdk-js/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,11 @@ impl JsRelay {
/// ### Auto-closing subscription
///
/// It's possible to automatically close a subscription by configuring the `SubscribeOptions`.
pub async fn subscribe(
&self,
filters: Vec<JsFilter>,
opts: &JsSubscribeOptions,
) -> Result<String> {
pub fn subscribe(&self, filters: Vec<JsFilter>, opts: &JsSubscribeOptions) -> Result<String> {
let filters: Vec<Filter> = filters.into_iter().map(|f| f.into()).collect();
Ok(self
.inner
.subscribe(filters, **opts) // TODO: allow to pass opts as reference
.await
.map_err(into_err)?
.to_string())
}
Expand All @@ -232,7 +227,7 @@ impl JsRelay {
///
/// It's possible to automatically close a subscription by configuring the `SubscribeOptions`.
#[wasm_bindgen(js_name = subscribeWithId)]
pub async fn subscribe_with_id(
pub fn subscribe_with_id(
&self,
id: &str,
filters: Vec<JsFilter>,
Expand All @@ -241,22 +236,20 @@ impl JsRelay {
let filters: Vec<Filter> = filters.into_iter().map(|f| f.into()).collect();
self.inner
.subscribe_with_id(SubscriptionId::new(id), filters, **opts) // TODO: allow to pass opts as reference
.await
.map_err(into_err)
}

/// Unsubscribe
pub async fn unsubscribe(&self, id: String) -> Result<()> {
pub fn unsubscribe(&self, id: String) -> Result<()> {
self.inner
.unsubscribe(SubscriptionId::new(id))
.await
.map_err(into_err)
}

/// Unsubscribe from all subscriptions
#[wasm_bindgen(js_name = unsubscribeAll)]
pub async fn unsubscribe_all(&self) -> Result<()> {
self.inner.unsubscribe_all().await.map_err(into_err)
pub fn unsubscribe_all(&self) -> Result<()> {
self.inner.unsubscribe_all().map_err(into_err)
}

/// Fetch events
Expand Down
3 changes: 1 addition & 2 deletions crates/nostr-relay-pool/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ impl InnerRelayPool {
for (id, filters) in subscriptions.into_iter() {
relay
.inner
.update_long_lived_subscription(id, filters, false)
.await;
.update_long_lived_subscription(id, filters, false);
}
}

Expand Down
18 changes: 4 additions & 14 deletions crates/nostr-relay-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,24 +761,14 @@ impl RelayPool {
return Err(Error::RelayNotFound);
}

let mut urls: Vec<RelayUrl> = Vec::with_capacity(targets.len());
let mut futures = Vec::with_capacity(targets.len());
let mut output: Output<()> = Output::default();

// Compose futures
for (url, filters) in targets.into_iter() {
let relay: &Relay = self.internal_relay(&relays, &url)?;
let id: SubscriptionId = id.clone();
urls.push(url);
futures.push(relay.subscribe_with_id(id, filters, opts));
}

// Join futures
let list = future::join_all(futures).await;

// Iter results and construct output
for (url, result) in urls.into_iter().zip(list.into_iter()) {
match result {
match relay.subscribe_with_id(id, filters, opts) {
Ok(..) => {
// Success, insert relay url in 'success' set result
output.success.insert(url);
Expand Down Expand Up @@ -808,7 +798,7 @@ impl RelayPool {

// Remove subscription from relays
for relay in relays.values() {
if let Err(e) = relay.unsubscribe(id.clone()).await {
if let Err(e) = relay.unsubscribe(id.clone()) {
tracing::error!("{e}");
}
}
Expand All @@ -822,11 +812,11 @@ impl RelayPool {
// Lock with read shared access
let relays = self.inner.atomic.relays.read().await;

// TODO: use join_all and return `Output`?
// TODO: return `Output`?

// Unsubscribe relays
for relay in relays.values() {
if let Err(e) = relay.unsubscribe_all().await {
if let Err(e) = relay.unsubscribe_all() {
tracing::error!("{e}");
}
}
Expand Down
Loading

0 comments on commit 8236e93

Please sign in to comment.