From 622d2446f1bc7dc3bba93eac8c641c311d6eaf80 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 12 May 2021 08:45:03 +0200 Subject: [PATCH 01/25] [ws server]: draft SubscriptionSinkWithParams --- ws-server/src/server.rs | 57 ++++++++++++++++++++++++++++-- ws-server/src/server/module.rs | 64 ++++++++++++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 5 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 109bd78223..c816767ecb 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -29,11 +29,10 @@ use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; use parking_lot::Mutex; use rustc_hash::FxHashMap; -use serde::Serialize; +use serde::{Serialize, de::DeserializeOwned}; use serde_json::value::to_raw_value; use soketto::handshake::{server::Response, Server as SokettoServer}; -use std::net::SocketAddr; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::{TcpListener, ToSocketAddrs}; use tokio_stream::wrappers::TcpListenerStream; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -89,6 +88,49 @@ impl SubscriptionSink { } } +pub struct InnerSubSinkParams

{ + /// Sink. + sink: mpsc::UnboundedSender, + /// Params. + params: P, + /// Subscription ID. + sub_id: SubscriptionId, + /// Method name + method: &'static str, +} + +impl

InnerSubSinkParams

{ + pub fn send(&self, result: &T) -> anyhow::Result<()> + where + T: Serialize, + { + let result = to_raw_value(result)?; + let msg = serde_json::to_string(&JsonRpcNotification { + jsonrpc: TwoPointZero, + method: self.method, + params: JsonRpcNotificationParams { subscription: self.sub_id, result: &*result }, + })?; + + self.sink.unbounded_send(msg).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + pub fn params(&self) -> &P { + &self.params + } +} + +pub struct SubscriptionSinkParams

{ + inner: Arc>>> +} + +impl

SubscriptionSinkParams

{ + pub fn next(&self) -> Option> { + let mut subs = self.inner.lock(); + let key = subs.keys().next().copied()?; + subs.remove(&key) + } +} + pub struct Server { root: RpcModule, listener: TcpListener, @@ -120,6 +162,15 @@ impl Server { self.root.register_subscription(subscribe_method_name, unsubscribe_method_name) } + /// Register a new RPC subscription with the possibility to get the params in subscription request. + pub fn register_subscription_with_params( + &mut self, + subscribe_method_name: &'static str, + unsubscribe_method_name: &'static str, + ) -> Result, Error> { + self.root.register_subscription_with_params(subscribe_method_name, unsubscribe_method_name) + } + /// Register all methods from a module on this server. pub fn register_module(&mut self, module: RpcModule) -> Result<(), Error> { self.root.merge(module) diff --git a/ws-server/src/server/module.rs b/ws-server/src/server/module.rs index bed04c5a3e..62e215a551 100644 --- a/ws-server/src/server/module.rs +++ b/ws-server/src/server/module.rs @@ -1,4 +1,4 @@ -use crate::server::{RpcParams, SubscriptionId, SubscriptionSink}; +use crate::server::{InnerSubSinkParams, RpcParams, SubscriptionId, SubscriptionSink, SubscriptionSinkParams}; use jsonrpsee_types::{ error::{CallError, Error}, v2::error::{JsonRpcErrorCode, JsonRpcErrorObject}, @@ -7,7 +7,7 @@ use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE}; use jsonrpsee_utils::server::{send_error, send_response, Methods}; use parking_lot::Mutex; use rustc_hash::FxHashMap; -use serde::Serialize; +use serde::{de::DeserializeOwned, Serialize}; use std::sync::Arc; #[derive(Default)] @@ -122,6 +122,66 @@ impl RpcModule { Ok(SubscriptionSink { method: subscribe_method_name, subscribers }) } + /// Register a new RPC subscription, with subscribe and unsubscribe methods. + pub fn register_subscription_with_params( + &mut self, + subscribe_method_name: &'static str, + unsubscribe_method_name: &'static str, + ) -> Result, Error> { + if subscribe_method_name == unsubscribe_method_name { + return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); + } + + self.verify_method_name(subscribe_method_name)?; + self.verify_method_name(unsubscribe_method_name)?; + + let subscribers = Arc::new(Mutex::new(FxHashMap::default())); + + { + let subscribers = subscribers.clone(); + self.methods.insert( + subscribe_method_name, + Box::new(move |id, params, tx, _| { + let params = params.parse().map_err(|_| CallError::InvalidParams)?; + let sub_id = { + const JS_NUM_MASK: SubscriptionId = !0 >> 11; + + let sub_id = rand::random::() & JS_NUM_MASK; + + subscribers.lock().insert( + sub_id, + InnerSubSinkParams { sink: tx.clone(), params, sub_id, method: subscribe_method_name }, + ); + + sub_id + }; + + send_response(id, tx, sub_id); + + Ok(()) + }), + ); + } + + { + let subscribers = subscribers.clone(); + self.methods.insert( + unsubscribe_method_name, + Box::new(move |id, params, tx, _| { + let sub_id: u64 = params.one().map_err(|e| anyhow::anyhow!("{:?}", e))?; + + subscribers.lock().remove(&sub_id); + + send_response(id, tx, "Unsubscribed"); + + Ok(()) + }), + ); + } + + Ok(SubscriptionSinkParams { inner: subscribers }) + } + pub(crate) fn into_methods(self) -> Methods { self.methods } From 4f1743abe7e91a0f490673cd00218d3c28800e21 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 12 May 2021 10:26:44 +0200 Subject: [PATCH 02/25] rexport types --- ws-server/src/lib.rs | 4 +++- ws-server/src/server.rs | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ws-server/src/lib.rs b/ws-server/src/lib.rs index a589f137b6..4b6edd930e 100644 --- a/ws-server/src/lib.rs +++ b/ws-server/src/lib.rs @@ -32,4 +32,6 @@ mod server; mod tests; pub use jsonrpsee_types::error::Error; -pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink}; +pub use server::{ + InnerSubSinkParams, RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink, SubscriptionSinkParams, +}; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index c816767ecb..82ca2ae2b9 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -29,7 +29,7 @@ use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; use parking_lot::Mutex; use rustc_hash::FxHashMap; -use serde::{Serialize, de::DeserializeOwned}; +use serde::{de::DeserializeOwned, Serialize}; use serde_json::value::to_raw_value; use soketto::handshake::{server::Response, Server as SokettoServer}; use std::{net::SocketAddr, sync::Arc}; @@ -120,7 +120,7 @@ impl

InnerSubSinkParams

{ } pub struct SubscriptionSinkParams

{ - inner: Arc>>> + inner: Arc>>>, } impl

SubscriptionSinkParams

{ From 1e5d0c3dbdc612e696a1a033d78c952b7ac101d4 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 12 May 2021 17:11:44 +0200 Subject: [PATCH 03/25] PoC design2 --- ws-server/src/lib.rs | 2 +- ws-server/src/server.rs | 72 ++++++++++++++++--------------- ws-server/src/server/module.rs | 77 ++++++---------------------------- 3 files changed, 51 insertions(+), 100 deletions(-) diff --git a/ws-server/src/lib.rs b/ws-server/src/lib.rs index 4b6edd930e..1adc0beadc 100644 --- a/ws-server/src/lib.rs +++ b/ws-server/src/lib.rs @@ -33,5 +33,5 @@ mod tests; pub use jsonrpsee_types::error::Error; pub use server::{ - InnerSubSinkParams, RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink, SubscriptionSinkParams, + InnerSink, RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink }; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 82ca2ae2b9..f0ac9cf7d3 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -48,16 +48,17 @@ mod module; pub use module::{RpcContextModule, RpcModule}; type SubscriptionId = u64; -type Subscribers = Arc>>>; +type Subscribers

= Arc>>>; #[derive(Clone)] -pub struct SubscriptionSink { +pub struct SubscriptionSink

{ method: &'static str, - subscribers: Subscribers, + subscribers: Subscribers

, } -impl SubscriptionSink { - pub fn send(&mut self, result: &T) -> anyhow::Result<()> +impl

SubscriptionSink

{ + /// Send message on all subscription without input. + pub fn send_without_input(&mut self, result: &T) -> anyhow::Result<()> where T: Serialize, { @@ -67,6 +68,10 @@ impl SubscriptionSink { let mut subs = self.subscribers.lock(); for ((conn_id, sub_id), sender) in subs.iter() { + if sender.params.is_none() { + continue; + } + let msg = serde_json::to_string(&JsonRpcNotification { jsonrpc: TwoPointZero, method: self.method, @@ -74,7 +79,7 @@ impl SubscriptionSink { })?; // Log broken connections - if sender.unbounded_send(msg).is_err() { + if sender.sink.unbounded_send(msg).is_err() { errored.push((*conn_id, *sub_id)); } } @@ -86,20 +91,41 @@ impl SubscriptionSink { Ok(()) } + + /// Extract subscriptions with input. + /// Usually, you want process the input before sending back + /// data on each subscription. + pub fn extract_with_input(&self) -> Vec> { + let mut subs = self.subscribers.lock(); + + let mut input = Vec::new(); + *subs = std::mem::replace(&mut *subs, FxHashMap::default()) + .into_iter() + .filter_map(|(k, v)| { + if v.params.is_some() { + input.push(v); + None + } else { + Some((k, v)) + } + }) + .collect(); + input + } } -pub struct InnerSubSinkParams

{ +pub struct InnerSink

{ /// Sink. sink: mpsc::UnboundedSender, /// Params. - params: P, + params: Option

, /// Subscription ID. sub_id: SubscriptionId, /// Method name method: &'static str, } -impl

InnerSubSinkParams

{ +impl

InnerSink

{ pub fn send(&self, result: &T) -> anyhow::Result<()> where T: Serialize, @@ -114,23 +140,11 @@ impl

InnerSubSinkParams

{ self.sink.unbounded_send(msg).map_err(|e| anyhow::anyhow!("{:?}", e)) } - pub fn params(&self) -> &P { + pub fn params(&self) -> &Option

{ &self.params } } -pub struct SubscriptionSinkParams

{ - inner: Arc>>>, -} - -impl

SubscriptionSinkParams

{ - pub fn next(&self) -> Option> { - let mut subs = self.inner.lock(); - let key = subs.keys().next().copied()?; - subs.remove(&key) - } -} - pub struct Server { root: RpcModule, listener: TcpListener, @@ -154,23 +168,13 @@ impl Server { } /// Register a new RPC subscription, with subscribe and unsubscribe methods. - pub fn register_subscription( + pub fn register_subscription( &mut self, subscribe_method_name: &'static str, unsubscribe_method_name: &'static str, - ) -> Result { + ) -> Result, Error> { self.root.register_subscription(subscribe_method_name, unsubscribe_method_name) } - - /// Register a new RPC subscription with the possibility to get the params in subscription request. - pub fn register_subscription_with_params( - &mut self, - subscribe_method_name: &'static str, - unsubscribe_method_name: &'static str, - ) -> Result, Error> { - self.root.register_subscription_with_params(subscribe_method_name, unsubscribe_method_name) - } - /// Register all methods from a module on this server. pub fn register_module(&mut self, module: RpcModule) -> Result<(), Error> { self.root.merge(module) diff --git a/ws-server/src/server/module.rs b/ws-server/src/server/module.rs index 62e215a551..b957d095cb 100644 --- a/ws-server/src/server/module.rs +++ b/ws-server/src/server/module.rs @@ -1,4 +1,4 @@ -use crate::server::{InnerSubSinkParams, RpcParams, SubscriptionId, SubscriptionSink, SubscriptionSinkParams}; +use crate::server::{RpcParams, SubscriptionId, SubscriptionSink, InnerSink}; use jsonrpsee_types::{ error::{CallError, Error}, v2::error::{JsonRpcErrorCode, JsonRpcErrorObject}, @@ -67,11 +67,11 @@ impl RpcModule { } /// Register a new RPC subscription, with subscribe and unsubscribe methods. - pub fn register_subscription( + pub fn register_subscription( &mut self, subscribe_method_name: &'static str, unsubscribe_method_name: &'static str, - ) -> Result { + ) -> Result, Error> { if subscribe_method_name == unsubscribe_method_name { return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); } @@ -85,13 +85,20 @@ impl RpcModule { let subscribers = subscribers.clone(); self.methods.insert( subscribe_method_name, - Box::new(move |id, _, tx, conn| { + Box::new(move |id, params, tx, conn| { + let params = params.parse().ok(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11; let sub_id = rand::random::() & JS_NUM_MASK; - subscribers.lock().insert((conn, sub_id), tx.clone()); + let inner = InnerSink { + sink: tx.clone(), + sub_id, + params, + method: subscribe_method_name + }; + subscribers.lock().insert((conn, sub_id), inner); sub_id }; @@ -122,66 +129,6 @@ impl RpcModule { Ok(SubscriptionSink { method: subscribe_method_name, subscribers }) } - /// Register a new RPC subscription, with subscribe and unsubscribe methods. - pub fn register_subscription_with_params( - &mut self, - subscribe_method_name: &'static str, - unsubscribe_method_name: &'static str, - ) -> Result, Error> { - if subscribe_method_name == unsubscribe_method_name { - return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); - } - - self.verify_method_name(subscribe_method_name)?; - self.verify_method_name(unsubscribe_method_name)?; - - let subscribers = Arc::new(Mutex::new(FxHashMap::default())); - - { - let subscribers = subscribers.clone(); - self.methods.insert( - subscribe_method_name, - Box::new(move |id, params, tx, _| { - let params = params.parse().map_err(|_| CallError::InvalidParams)?; - let sub_id = { - const JS_NUM_MASK: SubscriptionId = !0 >> 11; - - let sub_id = rand::random::() & JS_NUM_MASK; - - subscribers.lock().insert( - sub_id, - InnerSubSinkParams { sink: tx.clone(), params, sub_id, method: subscribe_method_name }, - ); - - sub_id - }; - - send_response(id, tx, sub_id); - - Ok(()) - }), - ); - } - - { - let subscribers = subscribers.clone(); - self.methods.insert( - unsubscribe_method_name, - Box::new(move |id, params, tx, _| { - let sub_id: u64 = params.one().map_err(|e| anyhow::anyhow!("{:?}", e))?; - - subscribers.lock().remove(&sub_id); - - send_response(id, tx, "Unsubscribed"); - - Ok(()) - }), - ); - } - - Ok(SubscriptionSinkParams { inner: subscribers }) - } - pub(crate) fn into_methods(self) -> Methods { self.methods } From 420d13de879e9de87a72adafca6e9fa67cce276d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 13 May 2021 10:46:33 +0200 Subject: [PATCH 04/25] improve example --- ws-server/src/server.rs | 50 ++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index f0ac9cf7d3..67499f1303 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -33,6 +33,7 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::value::to_raw_value; use soketto::handshake::{server::Response, Server as SokettoServer}; use std::{net::SocketAddr, sync::Arc}; +use std::convert::{TryFrom, TryInto}; use tokio::net::{TcpListener, ToSocketAddrs}; use tokio_stream::wrappers::TcpListenerStream; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -57,8 +58,12 @@ pub struct SubscriptionSink

{ } impl

SubscriptionSink

{ - /// Send message on all subscription without input. - pub fn send_without_input(&mut self, result: &T) -> anyhow::Result<()> + /// Send a message on all the subscriptions + /// + /// If you have subscriptions with params/input you should most likely + /// call `extract_with_input` to the process the input/params and send out + /// the result on each subscription individually instead. + pub fn send_all(&mut self, result: &T) -> anyhow::Result<()> where T: Serialize, { @@ -68,10 +73,6 @@ impl

SubscriptionSink

{ let mut subs = self.subscribers.lock(); for ((conn_id, sub_id), sender) in subs.iter() { - if sender.params.is_none() { - continue; - } - let msg = serde_json::to_string(&JsonRpcNotification { jsonrpc: TwoPointZero, method: self.method, @@ -94,8 +95,8 @@ impl

SubscriptionSink

{ /// Extract subscriptions with input. /// Usually, you want process the input before sending back - /// data on each subscription. - pub fn extract_with_input(&self) -> Vec> { + /// data on that subscription. + pub fn extract_with_input(&self) -> Vec> { let mut subs = self.subscribers.lock(); let mut input = Vec::new(); @@ -103,7 +104,8 @@ impl

SubscriptionSink

{ .into_iter() .filter_map(|(k, v)| { if v.params.is_some() { - input.push(v); + let with_input = v.try_into().expect("is Some checked above; qed"); + input.push(with_input); None } else { Some((k, v)) @@ -125,7 +127,32 @@ pub struct InnerSink

{ method: &'static str, } -impl

InnerSink

{ +pub struct InnerSinkWithParams

{ + /// Sink. + sink: mpsc::UnboundedSender, + /// Params. + params: P, + /// Subscription ID. + sub_id: SubscriptionId, + /// Method name + method: &'static str, +} + +impl

TryFrom> for InnerSinkWithParams

{ + type Error = (); + + fn try_from(other: InnerSink

) -> Result { + match other.params { + Some(params) => Ok(InnerSinkWithParams { sink: other.sink, params, sub_id: other.sub_id, method: other.method }), + None => Err(()) + } + } +} + +impl

InnerSinkWithParams

{ + /// Send data on a specific subscription + /// Note: a subscription can be "subscribed" to arbitary number of times with + /// diffrent input/params. pub fn send(&self, result: &T) -> anyhow::Result<()> where T: Serialize, @@ -140,7 +167,8 @@ impl

InnerSink

{ self.sink.unbounded_send(msg).map_err(|e| anyhow::anyhow!("{:?}", e)) } - pub fn params(&self) -> &Option

{ + /// Get the input/params of the subscrption. + pub fn params(&self) -> &P { &self.params } } From 6b1ab8b8976aea07b74fa196d8c3ae23343c27d3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 10:00:30 +0200 Subject: [PATCH 05/25] Update ws-server/src/server.rs Co-authored-by: David --- ws-server/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 67499f1303..8f2ec68aee 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -100,7 +100,7 @@ impl

SubscriptionSink

{ let mut subs = self.subscribers.lock(); let mut input = Vec::new(); - *subs = std::mem::replace(&mut *subs, FxHashMap::default()) + *subs = std::mem::take(&mut *subs) .into_iter() .filter_map(|(k, v)| { if v.params.is_some() { From 616468751aaafc5112f9ecce1d3ad8aa24767b2a Mon Sep 17 00:00:00 2001 From: David Date: Tue, 18 May 2021 11:58:52 +0200 Subject: [PATCH 06/25] Subscription example (#324) * Add a test for calling methods with multiple params of multiple types (#308) * Add a test for calling methods with multiple params of multiple types * cargo fmt Co-authored-by: Niklas Adolfsson * [ws client] RegisterNotification support (#303) * Rename NotifResponse to SubscriptionResponse to make room for new impl * Add support for on_notification Subscription types * Fix handling of NotificationHandler in manager * cleanup * Implement NotificationHandler to replace Subscription and clean up plumbing * More cleanup * impl Drop for NotificationHandler * Address pr feedback #1 * ws client register_notification pr feedback 2 * Fix doc * fix typo * Add tests, get NH working * More cleanup of String/&str * fix doc * Drop notification handler on send_back_sink error * ws client notification auto unsubscribe when channel full test * Change order of type params to register_method (#312) * Change order of type params to register_method * Cleanup and fmt * Update ws-server/src/tests.rs Co-authored-by: Niklas Adolfsson * CI: optimize caching (#317) * Bump actions/checkout from 2 to 2.3.4 (#315) Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 2.3.4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v2...v2.3.4) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/cargo from 1 to 1.0.3 (#314) Bumps [actions-rs/cargo](https://github.com/actions-rs/cargo) from 1 to 1.0.3. - [Release notes](https://github.com/actions-rs/cargo/releases) - [Changelog](https://github.com/actions-rs/cargo/blob/master/CHANGELOG.md) - [Commits](https://github.com/actions-rs/cargo/compare/v1...v1.0.3) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/toolchain from 1 to 1.0.7 (#313) Bumps [actions-rs/toolchain](https://github.com/actions-rs/toolchain) from 1 to 1.0.7. - [Release notes](https://github.com/actions-rs/toolchain/releases) - [Changelog](https://github.com/actions-rs/toolchain/blob/master/CHANGELOG.md) - [Commits](https://github.com/actions-rs/toolchain/compare/v1...v1.0.7) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [ws server]: add logs (#319) * WIP - hangs * fix example * cleanup * Add certificate_store() to WsClientBuilder (#321) * Add custom_certificate to WsClientBuilder * Use system certs instead of specified file * Cache client_config * Move client_config logic to fn build * Default use_system_certificates to true * Move out connector * Add CertificateStore type * cargo fmt * cargo clippy * Resolve comment: Rename variable * Resolved comments Co-authored-by: Niklas Adolfsson Co-authored-by: Billy Lindeman Co-authored-by: Denis Pisarev Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Albin Hedman --- examples/Cargo.toml | 4 ++ examples/ws_sub_with_params.rs | 82 ++++++++++++++++++++++++++++++++++ examples/ws_subscription.rs | 4 +- ws-server/src/server/module.rs | 2 +- 4 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 examples/ws_sub_with_params.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 251259e9ee..dbd15dbdab 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -25,6 +25,10 @@ path = "ws.rs" name = "ws_subscription" path = "ws_subscription.rs" +[[example]] +name = "ws_sub_with_params" +path = "ws_sub_with_params.rs" + [[example]] name = "proc_macro" path = "proc_macro.rs" diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs new file mode 100644 index 0000000000..9e77ea333c --- /dev/null +++ b/examples/ws_sub_with_params.rs @@ -0,0 +1,82 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use jsonrpsee::{ + ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder}, + ws_server::WsServer, +}; +use std::net::SocketAddr; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + let addr = run_server().await?; + let url = format!("ws://{}", addr); + + let client = WsClientBuilder::default().build(&url).await?; + + // Subscription with a single parameter + let params = JsonRpcParams::Array(vec![3.into()]); + let mut sub_params_one = client.subscribe::>("sub_one_param", params, "unsub_one_param").await?; + println!("subscription with one param: {:?}", sub_params_one.next().await); + + // Subscription with multiple parameters + let params = JsonRpcParams::Array(vec![2.into(), 5.into()]); + let mut sub_params_two = client.subscribe::("sub_params_two", params, "unsub_params_two").await?; + println!("subscription with two params: {:?}", sub_params_two.next().await); + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz"; + let mut server = WsServer::new("127.0.0.1:0").await?; + let one_param = server.register_subscription("sub_one_param", "unsub_one_param").unwrap(); + let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); + + std::thread::spawn(move || loop { + for sink_params in one_param.extract_with_input().iter() { + let idx = *sink_params.params(); + let result = LETTERS.chars().nth(idx); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(50)); + }); + + std::thread::spawn(move || loop { + for sink_params in two_params.extract_with_input().iter() { + let params: &Vec = sink_params.params(); + // Validate your params here: check len, check > 0 etc + let result = LETTERS[params[0]..params[1]].to_string(); + let _ = sink_params.send(&result); + } + std::thread::sleep(std::time::Duration::from_millis(100)); + }); + + let addr = server.local_addr(); + tokio::spawn(async move { server.start().await }); + addr +} diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index cba815c3a7..73bd687ece 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -54,10 +54,10 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let mut server = WsServer::new("127.0.0.1:0").await?; - let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); + let mut subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { - subscription.send(&"hello my friend").unwrap(); + subscription.send_all(&"hello my friend").unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); }); diff --git a/ws-server/src/server/module.rs b/ws-server/src/server/module.rs index 124e59897d..eb5a5d77b2 100644 --- a/ws-server/src/server/module.rs +++ b/ws-server/src/server/module.rs @@ -86,7 +86,7 @@ impl RpcModule { self.methods.insert( subscribe_method_name, Box::new(move |id, params, tx, conn| { - let params = params.parse().ok(); + let params = params.parse().or_else(|_| params.one().map_err(|_| CallError::InvalidParams)).ok(); let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11; From 7ef0f60376e980b92b22f56e73088fa4d6a21806 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 19 May 2021 22:03:28 +0200 Subject: [PATCH 07/25] grumbles: impl maciej proposal --- examples/ws_sub_with_params.rs | 13 +--- utils/src/server/rpc_module.rs | 109 +++++++++++---------------------- ws-server/src/server.rs | 2 +- 3 files changed, 40 insertions(+), 84 deletions(-) diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs index 9e77ea333c..341a010ef7 100644 --- a/examples/ws_sub_with_params.rs +++ b/examples/ws_sub_with_params.rs @@ -58,21 +58,12 @@ async fn run_server() -> anyhow::Result { let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); std::thread::spawn(move || loop { - for sink_params in one_param.extract_with_input().iter() { - let idx = *sink_params.params(); - let result = LETTERS.chars().nth(idx); - let _ = sink_params.send(&result); - } + let _ = one_param.send_all_with_params(|idx: &usize| LETTERS.chars().nth(*idx)); std::thread::sleep(std::time::Duration::from_millis(50)); }); std::thread::spawn(move || loop { - for sink_params in two_params.extract_with_input().iter() { - let params: &Vec = sink_params.params(); - // Validate your params here: check len, check > 0 etc - let result = LETTERS[params[0]..params[1]].to_string(); - let _ = sink_params.send(&result); - } + let _ = two_params.send_all_with_params(|params: &Vec| LETTERS[params[0]..params[1]].to_string()); std::thread::sleep(std::time::Duration::from_millis(100)); }); diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 2bc61dd378..cd80a15fd3 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -11,7 +11,6 @@ use rustc_hash::FxHashMap; use serde::{de::DeserializeOwned, Serialize}; use serde_json::value::to_raw_value; use std::sync::Arc; -use std::convert::{TryFrom, TryInto}; /// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, /// implemented as a function pointer to a `Fn` function taking four arguments: @@ -114,7 +113,7 @@ impl RpcModule { let sub_id = rand::random::() & JS_NUM_MASK; - let inner = InnerSink { sink: tx.clone(), sub_id, params, method: subscribe_method_name }; + let inner = InnerSink { sink: tx.clone(), params }; subscribers.lock().insert((conn, sub_id), inner); sub_id @@ -218,6 +217,7 @@ impl RpcContextModule { } } +/// Used by the server to send data back to subscribers. #[derive(Clone)] pub struct SubscriptionSink

{ method: &'static str, @@ -228,7 +228,7 @@ impl

SubscriptionSink

{ /// Send a message on all the subscriptions /// /// If you have subscriptions with params/input you should most likely - /// call `extract_with_input` to the process the input/params and send out + /// call `call_with_params` to the process the input/params and send out /// the result on each subscription individually instead. pub fn send_all(&mut self, result: &T) -> anyhow::Result<()> where @@ -260,82 +260,47 @@ impl

SubscriptionSink

{ Ok(()) } - /// Extract subscriptions with input. - /// Usually, you want process the input before sending back - /// data on that subscription. - pub fn extract_with_input(&self) -> Vec> { + /// Send a message to all subscriptions that could parse `P` as input. + /// + /// F: is closure that you need to provide to apply on the input P. + pub fn send_all_with_params(&self, f: F) -> anyhow::Result<()> + where + F: Fn(&P) -> T, + T: Serialize, + { let mut subs = self.subscribers.lock(); + let mut errored = Vec::new(); - let mut input = Vec::new(); - *subs = std::mem::take(&mut *subs) - .into_iter() - .filter_map(|(k, v)| { - if v.params.is_some() { - let with_input = v.try_into().expect("is Some checked above; qed"); - input.push(with_input); - None - } else { - Some((k, v)) - } - }) - .collect(); - input - } -} - -pub struct InnerSink

{ - /// Sink. - sink: mpsc::UnboundedSender, - /// Params. - params: Option

, - /// Subscription ID. - sub_id: SubscriptionId, - /// Method name - method: &'static str, -} - -pub struct InnerSinkWithParams

{ - /// Sink. - sink: mpsc::UnboundedSender, - /// Params. - params: P, - /// Subscription ID. - sub_id: SubscriptionId, - /// Method name - method: &'static str, -} + for ((conn_id, sub_id), sender) in subs.iter() { + let result = match sender.params.as_ref().map(|p| to_raw_value(&f(p))) { + Some(Ok(res)) => res, + _ => continue, + }; -impl

TryFrom> for InnerSinkWithParams

{ - type Error = (); + let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { + jsonrpc: TwoPointZero, + method: self.method, + params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, + })?; - fn try_from(other: InnerSink

) -> Result { - match other.params { - Some(params) => Ok(InnerSinkWithParams { sink: other.sink, params, sub_id: other.sub_id, method: other.method }), - None => Err(()) + // Log broken connections + if sender.sink.unbounded_send(msg).is_err() { + errored.push((*conn_id, *sub_id)); + } } - } -} -impl

InnerSinkWithParams

{ - /// Send data on a specific subscription - /// Note: a subscription can be "subscribed" to arbitary number of times with - /// diffrent input/params. - pub fn send(&self, result: &T) -> anyhow::Result<()> - where - T: Serialize, - { - let result = to_raw_value(result)?; - let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { - jsonrpc: TwoPointZero, - method: self.method, - params: JsonRpcNotificationParams { subscription: self.sub_id, result: &*result }, - })?; + // Remove broken connections + for entry in errored { + subs.remove(&entry); + } - self.sink.unbounded_send(msg).map_err(|e| anyhow::anyhow!("{:?}", e)) + Ok(()) } +} - /// Get the input/params of the subscrption. - pub fn params(&self) -> &P { - &self.params - } +struct InnerSink

{ + /// Sink. + sink: mpsc::UnboundedSender, + /// Params. + params: Option

, } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index c2f3bb5291..47f7db765b 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -27,7 +27,7 @@ use futures_channel::mpsc; use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; -use serde::{Serialize, de::DeserializeOwned}; +use serde::{de::DeserializeOwned, Serialize}; use soketto::handshake::{server::Response, Server as SokettoServer}; use std::{net::SocketAddr, sync::Arc}; use tokio::net::{TcpListener, ToSocketAddrs}; From 89ba83372b53316d4728446134ad9fea2151a782 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 19 May 2021 22:11:16 +0200 Subject: [PATCH 08/25] fix test build --- tests/tests/helpers.rs | 15 ++++++++++----- ws-server/src/tests.rs | 6 +++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 0adbaa4cd5..fb035fd0e9 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -25,7 +25,10 @@ // DEALINGS IN THE SOFTWARE. use futures_channel::oneshot; -use jsonrpsee::{http_server::HttpServerBuilder, ws_server::WsServer}; +use jsonrpsee::{ + http_server::HttpServerBuilder, + ws_server::{SubscriptionSink, WsServer}, +}; use std::net::SocketAddr; use std::time::Duration; @@ -36,8 +39,10 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { let rt = tokio::runtime::Runtime::new().unwrap(); let mut server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); - let mut sub_hello = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); - let mut sub_foo = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); + let mut sub_hello: SubscriptionSink<()> = + server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); + let mut sub_foo: SubscriptionSink<()> = + server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); server.register_method("say_hello", |_| Ok("hello")).unwrap(); @@ -49,8 +54,8 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { loop { tokio::time::sleep(Duration::from_millis(100)).await; - sub_hello.send(&"hello from subscription").unwrap(); - sub_foo.send(&1337_u64).unwrap(); + sub_hello.send_all(&"hello from subscription").unwrap(); + sub_foo.send_all(&1337_u64).unwrap(); } }); }); diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index 12d0a01d90..771413c4ce 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -229,8 +229,8 @@ async fn register_methods_works() { let mut server = WsServer::new("127.0.0.1:0").await.unwrap(); assert!(server.register_method("say_hello", |_| Ok("lo")).is_ok()); assert!(server.register_method("say_hello", |_| Ok("lo")).is_err()); - assert!(server.register_subscription("subscribe_hello", "unsubscribe_hello").is_ok()); - assert!(server.register_subscription("subscribe_hello_again", "unsubscribe_hello").is_err()); + assert!(server.register_subscription::<()>("subscribe_hello", "unsubscribe_hello").is_ok()); + assert!(server.register_subscription::<()>("subscribe_hello_again", "unsubscribe_hello").is_err()); assert!( server.register_method("subscribe_hello_again", |_| Ok("lo")).is_ok(), "Failed register_subscription should not have side-effects" @@ -241,7 +241,7 @@ async fn register_methods_works() { async fn register_same_subscribe_unsubscribe_is_err() { let mut server = WsServer::new("127.0.0.1:0").await.unwrap(); assert!(matches!( - server.register_subscription("subscribe_hello", "subscribe_hello"), + server.register_subscription::<()>("subscribe_hello", "subscribe_hello"), Err(Error::SubscriptionNameConflict(_)) )); } From fa16a5c61d077ac9258fb2d650ba4253049aab91 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 19 May 2021 23:19:46 +0200 Subject: [PATCH 09/25] add test for subscription with param --- examples/ws_subscription.rs | 2 +- tests/tests/helpers.rs | 7 +++++-- tests/tests/integration_tests.rs | 15 +++++++++++++++ utils/src/server/rpc_module.rs | 2 +- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 73bd687ece..a1506aaf66 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let mut server = WsServer::new("127.0.0.1:0").await?; - let mut subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); + let subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { subscription.send_all(&"hello my friend").unwrap(); diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index fb035fd0e9..aef24fe872 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -39,10 +39,12 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { let rt = tokio::runtime::Runtime::new().unwrap(); let mut server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); - let mut sub_hello: SubscriptionSink<()> = + let sub_hello: SubscriptionSink<()> = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); - let mut sub_foo: SubscriptionSink<()> = + let sub_foo: SubscriptionSink<()> = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); + let sub_add_one: SubscriptionSink = + server.register_subscription("subscribe_add_one", "unsubscribe_add_one").unwrap(); server.register_method("say_hello", |_| Ok("hello")).unwrap(); @@ -56,6 +58,7 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { sub_hello.send_all(&"hello from subscription").unwrap(); sub_foo.send_all(&1337_u64).unwrap(); + sub_add_one.send_all_with_params(|p| *p + 1).unwrap(); } }); }); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index e809ef8318..a1a50b59d0 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -54,6 +54,21 @@ async fn ws_subscription_works() { } } +#[tokio::test] +async fn ws_subscription_with_input_works() { + let server_addr = websocket_server_with_subscription().await; + let server_url = format!("ws://{}", server_addr); + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + let mut add_one: Subscription = + client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap(); + + for _ in 0..10 { + let two = add_one.next().await.unwrap(); + assert_eq!(two, 2); + } +} + + #[tokio::test] async fn ws_method_call_works() { let server_addr = websocket_server().await; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index cd80a15fd3..28ac7ca0b4 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -230,7 +230,7 @@ impl

SubscriptionSink

{ /// If you have subscriptions with params/input you should most likely /// call `call_with_params` to the process the input/params and send out /// the result on each subscription individually instead. - pub fn send_all(&mut self, result: &T) -> anyhow::Result<()> + pub fn send_all(&self, result: &T) -> anyhow::Result<()> where T: Serialize, { From d61f47afab4b95d5eeb0eb97d2dc8025a3310d9f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 19 May 2021 23:21:43 +0200 Subject: [PATCH 10/25] cargo fmt --- tests/tests/helpers.rs | 3 +-- tests/tests/integration_tests.rs | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index aef24fe872..1c2dec87a4 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -41,8 +41,7 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { let mut server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap(); let sub_hello: SubscriptionSink<()> = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); - let sub_foo: SubscriptionSink<()> = - server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); + let sub_foo: SubscriptionSink<()> = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); let sub_add_one: SubscriptionSink = server.register_subscription("subscribe_add_one", "unsubscribe_add_one").unwrap(); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index a1a50b59d0..7706445a62 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -68,7 +68,6 @@ async fn ws_subscription_with_input_works() { } } - #[tokio::test] async fn ws_method_call_works() { let server_addr = websocket_server().await; From aaebe6017a3f3ceebe7d1454c399bc5af361eb97 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:53:34 +0200 Subject: [PATCH 11/25] Update examples/ws_subscription.rs Co-authored-by: David --- examples/ws_subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index a1506aaf66..0c73494f9e 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -54,7 +54,7 @@ async fn main() -> anyhow::Result<()> { async fn run_server() -> anyhow::Result { let mut server = WsServer::new("127.0.0.1:0").await?; - let subscription = server.register_subscription::("subscribe_hello", "unsubscribe_hello").unwrap(); + let subscription = server.register_subscription::<()>("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { subscription.send_all(&"hello my friend").unwrap(); From fa2a82f72954e151123e60f8951f5b40220e3810 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:55:06 +0200 Subject: [PATCH 12/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 28ac7ca0b4..bdb3f6318a 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -27,6 +27,8 @@ pub type SubscriptionId = u64; /// Sink that is used to send back the result to the server for a specific method. pub type MethodSink = mpsc::UnboundedSender; +/// Map of subscribers keyed by the connection and subscription ids to an [`InnerSink`] that contains the parameters +/// they used to subscribe and the tx side of a channel used to convey results&errors back. type Subscribers

= Arc>>>; /// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or, From 59e24f5df2e32d5b2b2ee9d77ed5e350796465a1 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:55:33 +0200 Subject: [PATCH 13/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index bdb3f6318a..9e501b4225 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -31,7 +31,7 @@ pub type MethodSink = mpsc::UnboundedSender; /// they used to subscribe and the tx side of a channel used to convey results&errors back. type Subscribers

= Arc>>>; -/// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or, +/// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or, /// alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] pub struct RpcModule { From 21f3a921f5f46aefbf909ebb43680949a7c7889a Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:55:50 +0200 Subject: [PATCH 14/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 9e501b4225..f15ecf10dd 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -89,7 +89,9 @@ impl RpcModule { Ok(()) } - /// Register a new RPC subscription, with subscribe and unsubscribe methods. + /// Register a new RPC subscription, with subscribe and unsubscribe methods. Returns a [`SubscriptionSink`]. If a + /// method with the same name is already registered, an [`Error::MethodAlreadyRegistered`] is returned. + /// If the subscription does not take any parameters, set `P` to `()`. pub fn register_subscription( &mut self, subscribe_method_name: &'static str, From 57009b227da4d64c725381baf5af25ea57658589 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:56:02 +0200 Subject: [PATCH 15/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index f15ecf10dd..f13e14fd5f 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -229,7 +229,7 @@ pub struct SubscriptionSink

{ } impl

SubscriptionSink

{ - /// Send a message on all the subscriptions + /// Send a message on all the subscribers. /// /// If you have subscriptions with params/input you should most likely /// call `call_with_params` to the process the input/params and send out From c8316616425788b73d49c2c55d0fed2c240e30e1 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 09:56:45 +0200 Subject: [PATCH 16/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index f13e14fd5f..3436255e7a 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -266,7 +266,7 @@ impl

SubscriptionSink

{ /// Send a message to all subscriptions that could parse `P` as input. /// - /// F: is closure that you need to provide to apply on the input P. + /// F: is a closure that you need to provide to apply on the input P. pub fn send_all_with_params(&self, f: F) -> anyhow::Result<()> where F: Fn(&P) -> T, From 5269e53f55d4c246584eb989fc8138d5dafb9c7f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 10:24:11 +0200 Subject: [PATCH 17/25] grumbles --- examples/ws_sub_with_params.rs | 4 ++-- examples/ws_subscription.rs | 2 +- tests/tests/helpers.rs | 17 ++++++++++----- tests/tests/integration_tests.rs | 24 +++++++++++++++++---- utils/src/server/rpc_module.rs | 36 ++++++++++++++++---------------- 5 files changed, 53 insertions(+), 30 deletions(-) diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs index 341a010ef7..5288139c35 100644 --- a/examples/ws_sub_with_params.rs +++ b/examples/ws_sub_with_params.rs @@ -58,12 +58,12 @@ async fn run_server() -> anyhow::Result { let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); std::thread::spawn(move || loop { - let _ = one_param.send_all_with_params(|idx: &usize| LETTERS.chars().nth(*idx)); + let _ = one_param.send_each(|idx| LETTERS.chars().nth(*idx)); std::thread::sleep(std::time::Duration::from_millis(50)); }); std::thread::spawn(move || loop { - let _ = two_params.send_all_with_params(|params: &Vec| LETTERS[params[0]..params[1]].to_string()); + let _ = two_params.send_each(|params: &mut Vec| LETTERS[params[0]..params[1]].to_string()); std::thread::sleep(std::time::Duration::from_millis(100)); }); diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 0c73494f9e..6f5949da69 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -57,7 +57,7 @@ async fn run_server() -> anyhow::Result { let subscription = server.register_subscription::<()>("subscribe_hello", "unsubscribe_hello").unwrap(); std::thread::spawn(move || loop { - subscription.send_all(&"hello my friend").unwrap(); + subscription.broadcast(&"hello my friend").unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); }); diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 1c2dec87a4..0bb7f2a9ad 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -42,9 +42,10 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { let sub_hello: SubscriptionSink<()> = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); let sub_foo: SubscriptionSink<()> = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); - let sub_add_one: SubscriptionSink = + let sub_stateless_add_one: SubscriptionSink = + server.register_subscription("subscribe_stateless_add_one", "unsubscribe_stateless_add_one").unwrap(); + let sub_state_add_one: SubscriptionSink = server.register_subscription("subscribe_add_one", "unsubscribe_add_one").unwrap(); - server.register_method("say_hello", |_| Ok("hello")).unwrap(); server_started_tx.send(server.local_addr().unwrap()).unwrap(); @@ -55,9 +56,15 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { loop { tokio::time::sleep(Duration::from_millis(100)).await; - sub_hello.send_all(&"hello from subscription").unwrap(); - sub_foo.send_all(&1337_u64).unwrap(); - sub_add_one.send_all_with_params(|p| *p + 1).unwrap(); + sub_hello.broadcast(&"hello from subscription").unwrap(); + sub_foo.broadcast(&1337_u64).unwrap(); + sub_stateless_add_one.send_each(|p| *p + 1).unwrap(); + sub_state_add_one + .send_each(|p| { + *p += 1; + *p + }) + .unwrap(); } }); }); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 7706445a62..8939a44c1d 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -55,19 +55,35 @@ async fn ws_subscription_works() { } #[tokio::test] -async fn ws_subscription_with_input_works() { +async fn ws_subscription_stateless_input_works() { let server_addr = websocket_server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut add_one: Subscription = - client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap(); + let mut add_one: Subscription = client + .subscribe("subscribe_stateless_add_one", vec![1.into()].into(), "unsubscribe_stateless_add_one") + .await + .unwrap(); - for _ in 0..10 { + for _ in 0..2 { let two = add_one.next().await.unwrap(); assert_eq!(two, 2); } } +#[tokio::test] +async fn ws_subscription_input_with_state_works() { + let server_addr = websocket_server_with_subscription().await; + let server_url = format!("ws://{}", server_addr); + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); + let mut add_one: Subscription = + client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap(); + + for exp in 2..12 { + let next = add_one.next().await.unwrap(); + assert_eq!(next, exp); + } +} + #[tokio::test] async fn ws_method_call_works() { let server_addr = websocket_server().await; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 3436255e7a..6188dbb959 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -28,7 +28,7 @@ pub type SubscriptionId = u64; pub type MethodSink = mpsc::UnboundedSender; /// Map of subscribers keyed by the connection and subscription ids to an [`InnerSink`] that contains the parameters -/// they used to subscribe and the tx side of a channel used to convey results&errors back. +/// they used to subscribe and the tx side of a channel used to convey results and errors back. type Subscribers

= Arc>>>; /// Sets of JSON-RPC methods can be organized into a "module"s that are in turn registered on the server or, @@ -91,12 +91,12 @@ impl RpcModule { /// Register a new RPC subscription, with subscribe and unsubscribe methods. Returns a [`SubscriptionSink`]. If a /// method with the same name is already registered, an [`Error::MethodAlreadyRegistered`] is returned. - /// If the subscription does not take any parameters, set `P` to `()`. - pub fn register_subscription( + /// If the subscription does not take any parameters, set `Params` to `()`. + pub fn register_subscription( &mut self, subscribe_method_name: &'static str, unsubscribe_method_name: &'static str, - ) -> Result, Error> { + ) -> Result, Error> { if subscribe_method_name == unsubscribe_method_name { return Err(Error::SubscriptionNameConflict(subscribe_method_name.into())); } @@ -223,18 +223,18 @@ impl RpcContextModule { /// Used by the server to send data back to subscribers. #[derive(Clone)] -pub struct SubscriptionSink

{ +pub struct SubscriptionSink { method: &'static str, - subscribers: Subscribers

, + subscribers: Subscribers, } -impl

SubscriptionSink

{ - /// Send a message on all the subscribers. +impl SubscriptionSink { + /// Send a message on the all the subscribers. /// /// If you have subscriptions with params/input you should most likely - /// call `call_with_params` to the process the input/params and send out + /// call `send_each` to the process the input/params and send out /// the result on each subscription individually instead. - pub fn send_all(&self, result: &T) -> anyhow::Result<()> + pub fn broadcast(&self, result: &T) -> anyhow::Result<()> where T: Serialize, { @@ -250,7 +250,7 @@ impl

SubscriptionSink

{ params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, })?; - // Log broken connections + // Mark broken connections, to be removed. if sender.sink.unbounded_send(msg).is_err() { errored.push((*conn_id, *sub_id)); } @@ -267,16 +267,16 @@ impl

SubscriptionSink

{ /// Send a message to all subscriptions that could parse `P` as input. /// /// F: is a closure that you need to provide to apply on the input P. - pub fn send_all_with_params(&self, f: F) -> anyhow::Result<()> + pub fn send_each(&self, f: F) -> anyhow::Result<()> where - F: Fn(&P) -> T, + F: Fn(&mut Params) -> T, T: Serialize, { let mut subs = self.subscribers.lock(); let mut errored = Vec::new(); - for ((conn_id, sub_id), sender) in subs.iter() { - let result = match sender.params.as_ref().map(|p| to_raw_value(&f(p))) { + for ((conn_id, sub_id), sender) in subs.iter_mut() { + let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) { Some(Ok(res)) => res, _ => continue, }; @@ -287,7 +287,7 @@ impl

SubscriptionSink

{ params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, })?; - // Log broken connections + // Mark broken connections, to be removed. if sender.sink.unbounded_send(msg).is_err() { errored.push((*conn_id, *sub_id)); } @@ -302,9 +302,9 @@ impl

SubscriptionSink

{ } } -struct InnerSink

{ +struct InnerSink { /// Sink. sink: mpsc::UnboundedSender, /// Params. - params: Option

, + params: Option, } From 4e8ed839209f0a8da063f751d015ce627f0eca77 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 11:18:13 +0200 Subject: [PATCH 18/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 6188dbb959..f5f2d94aed 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -229,7 +229,7 @@ pub struct SubscriptionSink { } impl SubscriptionSink { - /// Send a message on the all the subscribers. + /// Send a message to all subscribers. /// /// If you have subscriptions with params/input you should most likely /// call `send_each` to the process the input/params and send out From ca777fb45d0717db07e8a9316bc46f358371bd90 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 11:18:24 +0200 Subject: [PATCH 19/25] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index f5f2d94aed..e9203ac5ec 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -264,7 +264,8 @@ impl SubscriptionSink { Ok(()) } - /// Send a message to all subscriptions that could parse `P` as input. + /// Send a message to all subscribers one by one, parsing the params they sent with the provided closure. If the + /// closure `F` fails to parse the params the message is not sent. /// /// F: is a closure that you need to provide to apply on the input P. pub fn send_each(&self, f: F) -> anyhow::Result<()> From 27f8cc7dcc9e0afeb97fa8b7dfe71552f9b0795c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 12:02:34 +0200 Subject: [PATCH 20/25] Update utils/src/server/rpc_module.rs Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index e9203ac5ec..bfe606eb88 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -223,7 +223,7 @@ impl RpcContextModule { /// Used by the server to send data back to subscribers. #[derive(Clone)] -pub struct SubscriptionSink { +pub struct SubscriptionSink { method: &'static str, subscribers: Subscribers, } From f27e1010e28e4043985aee64a9b6e524ed033b35 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 20 May 2021 13:10:09 +0200 Subject: [PATCH 21/25] fix more grumbles --- examples/ws_sub_with_params.rs | 4 +- tests/tests/helpers.rs | 12 +----- tests/tests/integration_tests.rs | 22 ++--------- types/src/v2/params.rs | 6 +-- utils/src/server/rpc_module.rs | 63 ++++++++++++++++++++++---------- 5 files changed, 53 insertions(+), 54 deletions(-) diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs index 5288139c35..9b07de28e7 100644 --- a/examples/ws_sub_with_params.rs +++ b/examples/ws_sub_with_params.rs @@ -58,12 +58,12 @@ async fn run_server() -> anyhow::Result { let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap(); std::thread::spawn(move || loop { - let _ = one_param.send_each(|idx| LETTERS.chars().nth(*idx)); + let _ = one_param.send_each(|idx| Ok(LETTERS.chars().nth(*idx))); std::thread::sleep(std::time::Duration::from_millis(50)); }); std::thread::spawn(move || loop { - let _ = two_params.send_each(|params: &mut Vec| LETTERS[params[0]..params[1]].to_string()); + let _ = two_params.send_each(|params: &Vec| Ok(Some(LETTERS[params[0]..params[1]].to_string()))); std::thread::sleep(std::time::Duration::from_millis(100)); }); diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 0bb7f2a9ad..fc590550ec 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -42,9 +42,7 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { let sub_hello: SubscriptionSink<()> = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap(); let sub_foo: SubscriptionSink<()> = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap(); - let sub_stateless_add_one: SubscriptionSink = - server.register_subscription("subscribe_stateless_add_one", "unsubscribe_stateless_add_one").unwrap(); - let sub_state_add_one: SubscriptionSink = + let sub_add_one: SubscriptionSink = server.register_subscription("subscribe_add_one", "unsubscribe_add_one").unwrap(); server.register_method("say_hello", |_| Ok("hello")).unwrap(); @@ -58,13 +56,7 @@ pub async fn websocket_server_with_subscription() -> SocketAddr { sub_hello.broadcast(&"hello from subscription").unwrap(); sub_foo.broadcast(&1337_u64).unwrap(); - sub_stateless_add_one.send_each(|p| *p + 1).unwrap(); - sub_state_add_one - .send_each(|p| { - *p += 1; - *p - }) - .unwrap(); + sub_add_one.send_each(|p| Ok(Some(*p + 1))).unwrap(); } }); }); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 8939a44c1d..95f1f18b18 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -55,14 +55,12 @@ async fn ws_subscription_works() { } #[tokio::test] -async fn ws_subscription_stateless_input_works() { +async fn ws_subscription_with_input_works() { let server_addr = websocket_server_with_subscription().await; let server_url = format!("ws://{}", server_addr); let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut add_one: Subscription = client - .subscribe("subscribe_stateless_add_one", vec![1.into()].into(), "unsubscribe_stateless_add_one") - .await - .unwrap(); + let mut add_one: Subscription = + client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap(); for _ in 0..2 { let two = add_one.next().await.unwrap(); @@ -70,20 +68,6 @@ async fn ws_subscription_stateless_input_works() { } } -#[tokio::test] -async fn ws_subscription_input_with_state_works() { - let server_addr = websocket_server_with_subscription().await; - let server_url = format!("ws://{}", server_addr); - let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - let mut add_one: Subscription = - client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap(); - - for exp in 2..12 { - let next = add_one.next().await.unwrap(); - assert_eq!(next, exp); - } -} - #[tokio::test] async fn ws_method_call_works() { let server_addr = websocket_server().await; diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index aa719bb6ff..85566a89e6 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -83,10 +83,8 @@ impl<'a> RpcParams<'a> { where T: Deserialize<'a>, { - match self.0 { - None => Err(CallError::InvalidParams), - Some(params) => serde_json::from_str(params).map_err(|_| CallError::InvalidParams), - } + let params = self.0.unwrap_or("null"); + serde_json::from_str(params).map_err(|_| CallError::InvalidParams) } /// Attempt to parse only the first parameter from an array into type T diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 6188dbb959..9f6e9b63b2 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -111,7 +111,13 @@ impl RpcModule { self.methods.insert( subscribe_method_name, Box::new(move |id, params, tx, conn| { - let params = params.parse().or_else(|_| params.one().map_err(|_| CallError::InvalidParams)).ok(); + let params = match params.parse().or_else(|_| params.one()) { + Ok(p) => p, + Err(err) => { + log::error!("Params={:?}, in subscription couldn't be parsed: {:?}", params, err); + return Err(err.into()); + } + }; let sub_id = { const JS_NUM_MASK: SubscriptionId = !0 >> 11; @@ -223,7 +229,7 @@ impl RpcContextModule { /// Used by the server to send data back to subscribers. #[derive(Clone)] -pub struct SubscriptionSink { +pub struct SubscriptionSink { method: &'static str, subscribers: Subscribers, } @@ -243,7 +249,7 @@ impl SubscriptionSink { let mut errored = Vec::new(); let mut subs = self.subscribers.lock(); - for ((conn_id, sub_id), sender) in subs.iter() { + for ((conn_id, sub_id), sink) in subs.iter() { let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { jsonrpc: TwoPointZero, method: self.method, @@ -251,7 +257,7 @@ impl SubscriptionSink { })?; // Mark broken connections, to be removed. - if sender.sink.unbounded_send(msg).is_err() { + if sink.send(msg).is_err() { errored.push((*conn_id, *sub_id)); } } @@ -269,27 +275,40 @@ impl SubscriptionSink { /// F: is a closure that you need to provide to apply on the input P. pub fn send_each(&self, f: F) -> anyhow::Result<()> where - F: Fn(&mut Params) -> T, + F: Fn(&Params) -> anyhow::Result>, T: Serialize, { let mut subs = self.subscribers.lock(); let mut errored = Vec::new(); - for ((conn_id, sub_id), sender) in subs.iter_mut() { - let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) { - Some(Ok(res)) => res, - _ => continue, - }; + for ((conn_id, sub_id), sink) in subs.iter() { + match f(&sink.params) { + Ok(Some(res)) => { + let result = match to_raw_value(&res) { + Ok(res) => res, + Err(err) => { + log::error!("Subscription: {} failed to serialize message: {:?}; ignoring", sub_id, err); + continue; + } + }; - let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { - jsonrpc: TwoPointZero, - method: self.method, - params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, - })?; + let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { + jsonrpc: TwoPointZero, + method: self.method, + params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, + })?; - // Mark broken connections, to be removed. - if sender.sink.unbounded_send(msg).is_err() { - errored.push((*conn_id, *sub_id)); + if sink.send(msg).is_err() { + errored.push((*conn_id, *sub_id)); + } + } + // NOTE(niklasad1): This might be used to fetch data in closure. + Ok(None) => (), + Err(e) => { + if sink.send(format!("Error: {:?}", e)).is_err() { + errored.push((*conn_id, *sub_id)); + } + } } } @@ -306,5 +325,11 @@ struct InnerSink { /// Sink. sink: mpsc::UnboundedSender, /// Params. - params: Option, + params: Params, +} + +impl InnerSink { + fn send(&self, msg: String) -> anyhow::Result<()> { + self.sink.unbounded_send(msg).map_err(Into::into) + } } From ca4ad3f1eb3d54b78776313fc740ba8c882fb900 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 21 May 2021 18:01:21 +0200 Subject: [PATCH 22/25] [subscriptionSink]: introduce into_sinks --- utils/src/server/rpc_module.rs | 35 +++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index a13dc0e38a..2d10cab710 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -269,7 +269,7 @@ impl SubscriptionSink { for ((conn_id, sub_id), sink) in subs.iter() { // Mark broken connections, to be removed. - if sink.send_subscription_message(&result).is_err() { + if sink.send_raw_value(&result).is_err() { errored.push((*conn_id, *sub_id)); } } @@ -305,14 +305,14 @@ impl SubscriptionSink { } }; - if sink.send_subscription_message(&result).is_err() { + if sink.send_raw_value(&result).is_err() { errored.push((*conn_id, *sub_id)); } } // NOTE(niklasad1): This might be used to fetch data in closure. Ok(None) => (), Err(e) => { - if sink.send(format!("Error: {:?}", e)).is_err() { + if sink.inner_send(format!("Error: {:?}", e)).is_err() { errored.push((*conn_id, *sub_id)); } } @@ -326,9 +326,19 @@ impl SubscriptionSink { Ok(()) } + + /// Consumes the current subscriptions at the given time to get access to the inner Sinks. + /// The SubscriptionSink will accept new subscriptions after this occurs. + // TODO: we should get rid of this if possible. + pub fn into_sinks(&self) -> impl IntoIterator> { + let mut subs = self.subscribers.lock(); + let take = std::mem::replace(&mut *subs, FxHashMap::default()); + take.into_iter().map(|(_, v)| v) + } } -struct InnerSink { +/// Represents a single subscription. +pub struct InnerSink { /// Sink. sink: mpsc::UnboundedSender, /// Params. @@ -340,19 +350,30 @@ struct InnerSink { } impl InnerSink { - fn send_subscription_message(&self, result: &RawValue) -> anyhow::Result<()> { + /// Send message on this subscription. + pub fn send(&self, result: &T) -> anyhow::Result<()> { + let result = to_raw_value(result)?; + self.send_raw_value(&result) + } + + fn send_raw_value(&self, result: &RawValue) -> anyhow::Result<()> { let msg = serde_json::to_string(&JsonRpcSubscriptionResponse { jsonrpc: TwoPointZero, method: self.method, params: JsonRpcNotificationParams { subscription: self.sub_id, result: &*result }, })?; - self.send(msg).map_err(Into::into) + self.inner_send(msg).map_err(Into::into) } - fn send(&self, msg: String) -> anyhow::Result<()> { + fn inner_send(&self, msg: String) -> anyhow::Result<()> { self.sink.unbounded_send(msg).map_err(Into::into) } + + /// Get params of the subscription. + pub fn params(&self) -> &Params { + &self.params + } } #[cfg(test)] From a5b86b22ef6686ab3999e8fb1d3a53da15b70cd3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 21 May 2021 18:09:58 +0200 Subject: [PATCH 23/25] use replace --- utils/src/server/rpc_module.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 2d10cab710..483560b320 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -332,8 +332,8 @@ impl SubscriptionSink { // TODO: we should get rid of this if possible. pub fn into_sinks(&self) -> impl IntoIterator> { let mut subs = self.subscribers.lock(); - let take = std::mem::replace(&mut *subs, FxHashMap::default()); - take.into_iter().map(|(_, v)| v) + let sinks = std::mem::take(&mut *subs); + sinks.into_iter().map(|(_, v)| v) } } From 5e50b8ede617e6f3477ccc4ae78e5120f44499ed Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 21 May 2021 18:50:56 +0200 Subject: [PATCH 24/25] fix more nits --- utils/src/server/rpc_module.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 483560b320..685d627e61 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -276,6 +276,7 @@ impl SubscriptionSink { // Remove broken connections for entry in errored { + log::debug!("Dropping subscription on method: {}, id: {}", self.method, entry.1); subs.remove(&entry); } @@ -321,16 +322,17 @@ impl SubscriptionSink { // Remove broken connections for entry in errored { + log::debug!("Dropping subscription on method: {}, id: {}", self.method, entry.1); subs.remove(&entry); } Ok(()) } - /// Consumes the current subscriptions at the given time to get access to the inner Sinks. - /// The SubscriptionSink will accept new subscriptions after this occurs. - // TODO: we should get rid of this if possible. - pub fn into_sinks(&self) -> impl IntoIterator> { + /// Consumes the current subscriptions at the given time to get access to the individual subscribers. + /// The [`SubscriptionSink`] will accept new subscriptions after this is called. + // TODO(niklasad1): get rid of this if possible. + pub fn to_sinks(&self) -> impl IntoIterator> { let mut subs = self.subscribers.lock(); let sinks = std::mem::take(&mut *subs); sinks.into_iter().map(|(_, v)| v) From 9249e4561195f5900eecc7245c41d739ef43ec0b Mon Sep 17 00:00:00 2001 From: David Palm Date: Wed, 26 May 2021 15:00:40 +0200 Subject: [PATCH 25/25] Remove comment --- utils/src/server/rpc_module.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 02e8d5fc1e..abcb2f5606 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -311,7 +311,6 @@ impl SubscriptionSink { errored.push((*conn_id, *sub_id)); } } - // NOTE(niklasad1): This might be used to fetch data in closure. Ok(None) => (), Err(e) => { if sink.inner_send(format!("Error: {:?}", e)).is_err() {