From 286edbe070fafde6f2a691fa70ff53b9e6c1c743 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 11:26:23 +0200 Subject: [PATCH 01/20] [rpc server]: extract rpc_module to utils. This commit extracts the `rpc_module` from the servers to be shared. It will help to re-use rpc modules within both the servers --- http-server/src/lib.rs | 3 +- http-server/src/module.rs | 132 ------------------ http-server/src/server.rs | 14 +- utils/Cargo.toml | 15 +- utils/src/{server.rs => server/helpers.rs} | 17 +-- utils/src/server/mod.rs | 11 ++ .../src/server/rpc_module.rs | 75 ++++++++-- ws-server/Cargo.toml | 2 - ws-server/src/lib.rs | 3 +- ws-server/src/server.rs | 56 +------- 10 files changed, 107 insertions(+), 221 deletions(-) delete mode 100644 http-server/src/module.rs rename utils/src/{server.rs => server/helpers.rs} (74%) create mode 100644 utils/src/server/mod.rs rename ws-server/src/server/module.rs => utils/src/server/rpc_module.rs (67%) diff --git a/http-server/src/lib.rs b/http-server/src/lib.rs index ea792bc4bf..e52eecfb3a 100644 --- a/http-server/src/lib.rs +++ b/http-server/src/lib.rs @@ -25,13 +25,12 @@ // DEALINGS IN THE SOFTWARE. mod access_control; -mod module; mod response; mod server; pub use access_control::{AccessControl, AccessControlBuilder, AllowHosts, Host}; pub use jsonrpsee_types::{Error, TEN_MB_SIZE_BYTES}; -pub use module::{RpcContextModule, RpcModule}; +pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule}; pub use server::{Builder as HttpServerBuilder, Server as HttpServer}; #[cfg(test)] diff --git a/http-server/src/module.rs b/http-server/src/module.rs deleted file mode 100644 index 0e9ad58e8e..0000000000 --- a/http-server/src/module.rs +++ /dev/null @@ -1,132 +0,0 @@ -use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; -use jsonrpsee_types::{ - error::{CallError, Error}, - traits::RpcMethod, - v2::params::RpcParams, -}; -use jsonrpsee_utils::server::{send_error, send_response, Methods}; -use serde::Serialize; -use std::sync::Arc; - -#[derive(Default)] -pub struct RpcModule { - methods: Methods, -} - -impl RpcModule { - /// Instantiate a new `RpcModule`. - pub fn new() -> Self { - RpcModule { methods: Methods::default() } - } - - /// Add context for this module, turning it into an `RpcContextModule`. - pub fn with_context(self, ctx: Context) -> RpcContextModule { - RpcContextModule { ctx: Arc::new(ctx), module: self } - } - - fn verify_method_name(&mut self, name: &str) -> Result<(), Error> { - if self.methods.get(name).is_some() { - return Err(Error::MethodAlreadyRegistered(name.into())); - } - - Ok(()) - } - - /// Register a new RPC method, which responds with a given callback. - pub fn register_method(&mut self, method_name: &'static str, callback: F) -> Result<(), Error> - where - R: Serialize, - F: RpcMethod, - { - self.verify_method_name(method_name)?; - - self.methods.insert( - method_name, - Box::new(move |id, params, tx, _| { - match callback(params) { - Ok(res) => send_response(id, tx, res), - Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()), - Err(CallError::Failed(err)) => { - log::error!("Call failed with: {}", err); - let err = JsonRpcErrorObject { - code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), - message: &err.to_string(), - data: None, - }; - send_error(id, tx, err) - } - }; - - Ok(()) - }), - ); - - Ok(()) - } - - pub(crate) fn into_methods(self) -> Methods { - self.methods - } - - pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> { - for name in other.methods.keys() { - self.verify_method_name(name)?; - } - - for (name, callback) in other.methods { - self.methods.insert(name, callback); - } - - Ok(()) - } -} - -pub struct RpcContextModule { - ctx: Arc, - module: RpcModule, -} - -impl RpcContextModule { - /// Create a new module with a given shared `Context`. - pub fn new(ctx: Context) -> Self { - RpcContextModule { ctx: Arc::new(ctx), module: RpcModule::new() } - } - - /// Register a new RPC method, which responds with a given callback. - pub fn register_method(&mut self, method_name: &'static str, callback: F) -> Result<(), Error> - where - Context: Send + Sync + 'static, - R: Serialize, - F: Fn(RpcParams, &Context) -> Result + Send + Sync + 'static, - { - self.module.verify_method_name(method_name)?; - - let ctx = self.ctx.clone(); - - self.module.methods.insert( - method_name, - Box::new(move |id, params, tx, _| { - match callback(params, &*ctx) { - Ok(res) => send_response(id, tx, res), - Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()), - Err(CallError::Failed(err)) => { - let err = JsonRpcErrorObject { - code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), - message: &err.to_string(), - data: None, - }; - send_error(id, tx, err) - } - }; - Ok(()) - }), - ); - - Ok(()) - } - - /// Convert this `RpcContextModule` into a regular `RpcModule` that can be registered on the `Server`. - pub fn into_module(self) -> RpcModule { - self.module - } -} diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 8de5ee8ed0..ff15c8e760 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -24,10 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::module::RpcModule; -use crate::response; -use crate::AccessControl; -use crate::TEN_MB_SIZE_BYTES; +use crate::{response, AccessControl, TEN_MB_SIZE_BYTES}; use anyhow::anyhow; use futures_channel::mpsc; use futures_util::stream::StreamExt; @@ -39,10 +36,11 @@ use hyper::{ use jsonrpsee_types::error::{CallError, Error, GenericTransportError}; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; -use jsonrpsee_utils::{ - hyper_helpers::read_response_to_body, - server::{collect_batch_response, send_error, RpcSender}, -}; +use jsonrpsee_utils::hyper_helpers::read_response_to_body; +use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; +use jsonrpsee_utils::server::rpc_module::RpcModule; +use jsonrpsee_utils::server::RpcSender; + use serde::Serialize; use socket2::{Domain, Socket, Type}; use std::{ diff --git a/utils/Cargo.toml b/utils/Cargo.toml index f70a505e34..2554964ab7 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -15,14 +15,27 @@ hyper14 = { package = "hyper", version = "0.14", default-features = false, featu jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6", optional = true } log = { version = "0.4", optional = true } rustc-hash = { version = "1", optional = true } +rand = { version = "0.8", optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1", features = ["raw_value"], optional = true } +parking_lot = { version = "0.11", optional = true } [features] default = [] hyper_13 = ["hyper13", "futures-util", "jsonrpsee-types"] hyper_14 = ["hyper14", "futures-util", "jsonrpsee-types"] -server = ["anyhow", "futures-channel", "futures-util", "jsonrpsee-types", "rustc-hash", "serde", "serde_json", "log"] +server = [ + "anyhow", + "futures-channel", + "futures-util", + "jsonrpsee-types", + "rustc-hash", + "serde", + "serde_json", + "log", + "parking_lot", + "rand" +] [dev-dependencies] serde_json = "1.0" diff --git a/utils/src/server.rs b/utils/src/server/helpers.rs similarity index 74% rename from utils/src/server.rs rename to utils/src/server/helpers.rs index 1f36c49dee..0882eceaee 100644 --- a/utils/src/server.rs +++ b/utils/src/server/helpers.rs @@ -1,24 +1,11 @@ -//! Shared helpers for JSON-RPC Servers. +use crate::server::{RpcId, RpcSender}; use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject}; -use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero}; +use jsonrpsee_types::v2::params::TwoPointZero; use jsonrpsee_types::v2::response::JsonRpcResponse; -use rustc_hash::FxHashMap; use serde::Serialize; -use serde_json::value::RawValue; - -/// Connection ID. -pub type ConnectionId = usize; -/// Sender. -pub type RpcSender<'a> = &'a mpsc::UnboundedSender; -/// RPC ID. -pub type RpcId<'a> = Option<&'a RawValue>; -/// Method registered in the server. -pub type Method = Box anyhow::Result<()>>; -/// Methods registered in the Server. -pub type Methods = FxHashMap<&'static str, Method>; /// Helper for sending JSON-RPC responses to the client pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) { diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs new file mode 100644 index 0000000000..dab3bc867b --- /dev/null +++ b/utils/src/server/mod.rs @@ -0,0 +1,11 @@ +//! Shared modules for the JSON-RPC servers. + +/// Helpers. +pub mod helpers; +/// Abstract JSON-RPC modules that can be used to register methods on a server. +pub mod rpc_module; + +/// Sender. +pub type RpcSender<'a> = &'a futures_channel::mpsc::UnboundedSender; +/// RPC ID. +pub type RpcId<'a> = Option<&'a serde_json::value::RawValue>; diff --git a/ws-server/src/server/module.rs b/utils/src/server/rpc_module.rs similarity index 67% rename from ws-server/src/server/module.rs rename to utils/src/server/rpc_module.rs index 540117a579..486e8c34bc 100644 --- a/ws-server/src/server/module.rs +++ b/utils/src/server/rpc_module.rs @@ -1,15 +1,29 @@ -use crate::server::{RpcParams, SubscriptionId, SubscriptionSink}; -use jsonrpsee_types::{ - error::{CallError, Error}, - v2::error::{JsonRpcErrorCode, JsonRpcErrorObject}, -}; -use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE}; -use jsonrpsee_utils::server::{send_error, send_response, Methods}; +use crate::server::helpers::{send_error, send_response}; +use crate::server::{RpcId, RpcSender}; +use futures_channel::mpsc; +use jsonrpsee_types::error::{CallError, Error}; +use jsonrpsee_types::traits::RpcMethod; +use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; +use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero}; +use jsonrpsee_types::v2::request::JsonRpcNotification; + use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::Serialize; +use serde_json::value::to_raw_value; use std::sync::Arc; +/// Method. +pub type Method = Box anyhow::Result<()>>; +/// Methods registered. +pub type Methods = FxHashMap<&'static str, Method>; +/// Connection ID. +pub type ConnectionId = usize; +/// Subscription ID. +pub type SubscriptionId = u64; +type Subscribers = Arc>>>; + +/// Abstract JSON-RPC module that be registered on server or merged with other modules. #[derive(Default)] pub struct RpcModule { methods: Methods, @@ -122,11 +136,13 @@ impl RpcModule { Ok(SubscriptionSink { method: subscribe_method_name, subscribers }) } - pub(crate) fn into_methods(self) -> Methods { + /// Convert a module into methods. + pub fn into_methods(self) -> Methods { self.methods } - pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> { + /// Merge modules. + pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> { for name in other.methods.keys() { self.verify_method_name(name)?; } @@ -139,6 +155,8 @@ impl RpcModule { } } +/// Similar to [`RpcModule`] but it wraps additional context that can used +/// embed specific data that can be accessed while a call is executed. pub struct RpcContextModule { ctx: Arc, module: RpcModule, @@ -188,3 +206,42 @@ impl RpcContextModule { self.module } } + +/// The sending end of registered subscription. +#[derive(Clone)] +pub struct SubscriptionSink { + method: &'static str, + subscribers: Subscribers, +} + +impl SubscriptionSink { + /// Send. + pub fn send(&mut self, result: &T) -> anyhow::Result<()> + where + T: Serialize, + { + let result = to_raw_value(result)?; + + let mut errored = Vec::new(); + let mut subs = self.subscribers.lock(); + + for ((conn_id, sub_id), sender) in subs.iter() { + let msg = serde_json::to_string(&JsonRpcNotification { + jsonrpc: TwoPointZero, + method: self.method, + params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, + })?; + + // Log broken connections + if sender.unbounded_send(msg).is_err() { + errored.push((*conn_id, *sub_id)); + } + } + + // Remove broken connections + for entry in errored { + subs.remove(&entry); + } + Ok(()) + } +} diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 12d2ca058a..e48acd8e38 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -16,8 +16,6 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] } jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6" } jsonrpsee-utils = { path = "../utils", version = "0.2.0-alpha.6", features = ["server"] } log = "0.4" -parking_lot = "0.11" -rand = "0.8" rustc-hash = "1.1.0" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } diff --git a/ws-server/src/lib.rs b/ws-server/src/lib.rs index a589f137b6..dee92502f4 100644 --- a/ws-server/src/lib.rs +++ b/ws-server/src/lib.rs @@ -32,4 +32,5 @@ mod server; mod tests; pub use jsonrpsee_types::error::Error; -pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink}; +pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule, SubscriptionSink}; +pub use server::Server as WsServer; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 024f58931c..3f541f5001 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -27,10 +27,7 @@ use futures_channel::mpsc; use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; -use parking_lot::Mutex; -use rustc_hash::FxHashMap; use serde::Serialize; -use serde_json::value::to_raw_value; use soketto::handshake::{server::Response, Server as SokettoServer}; use std::net::SocketAddr; use std::sync::Arc; @@ -40,54 +37,11 @@ use tokio_util::compat::TokioAsyncReadCompatExt; use jsonrpsee_types::error::{CallError, Error}; use jsonrpsee_types::v2::error::JsonRpcErrorCode; -use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero}; -use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcNotification, JsonRpcRequest}; -use jsonrpsee_utils::server::{collect_batch_response, send_error, ConnectionId, Methods, RpcSender}; - -mod module; - -pub use module::{RpcContextModule, RpcModule}; - -type SubscriptionId = u64; -type Subscribers = Arc>>>; - -#[derive(Clone)] -pub struct SubscriptionSink { - method: &'static str, - subscribers: Subscribers, -} - -impl SubscriptionSink { - pub fn send(&mut self, result: &T) -> anyhow::Result<()> - where - T: Serialize, - { - let result = to_raw_value(result)?; - - let mut errored = Vec::new(); - let mut subs = self.subscribers.lock(); - - for ((conn_id, sub_id), sender) in subs.iter() { - let msg = serde_json::to_string(&JsonRpcNotification { - jsonrpc: TwoPointZero, - method: self.method, - params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, - })?; - - // Log broken connections - if sender.unbounded_send(msg).is_err() { - errored.push((*conn_id, *sub_id)); - } - } - - // Remove broken connections - for entry in errored { - subs.remove(&entry); - } - - Ok(()) - } -} +use jsonrpsee_types::v2::params::RpcParams; +use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; +use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; +use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods, RpcModule, SubscriptionSink}; +use jsonrpsee_utils::server::RpcSender; pub struct Server { root: RpcModule, From d74fb0bd8793362783a4b805af273ac5ff537693 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:34:57 +0200 Subject: [PATCH 02/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 486e8c34bc..858b87b550 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -155,8 +155,8 @@ impl RpcModule { } } -/// Similar to [`RpcModule`] but it wraps additional context that can used -/// embed specific data that can be accessed while a call is executed. +/// Similar to [`RpcModule`] but it wraps an additional context argument that can be used +/// to access data during call execution. pub struct RpcContextModule { ctx: Arc, module: RpcModule, From cae3b8f269f57348b0aad37953eb25baf50875e7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:35:04 +0200 Subject: [PATCH 03/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 858b87b550..29856f4327 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -232,7 +232,7 @@ impl SubscriptionSink { params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, })?; - // Log broken connections + // Track broken connections if sender.unbounded_send(msg).is_err() { errored.push((*conn_id, *sub_id)); } From f26a9f3b6819e9780aad94aeeee70d52a7bbc672 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:35:18 +0200 Subject: [PATCH 04/20] Update utils/src/server/mod.rs Co-authored-by: David --- utils/src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs index dab3bc867b..b508fccd31 100644 --- a/utils/src/server/mod.rs +++ b/utils/src/server/mod.rs @@ -2,7 +2,7 @@ /// Helpers. pub mod helpers; -/// Abstract JSON-RPC modules that can be used to register methods on a server. +/// JSON-RPC "modules" groups sets of methods that belong together and handles method/subscription registration. pub mod rpc_module; /// Sender. From aba03839d4907f780bad20caab9c657de22b010e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:35:47 +0200 Subject: [PATCH 05/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 29856f4327..eb62f2e89b 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -23,7 +23,7 @@ pub type ConnectionId = usize; pub type SubscriptionId = u64; type Subscribers = Arc>>>; -/// Abstract JSON-RPC module that be registered on server or merged with other modules. +/// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] pub struct RpcModule { methods: Methods, From 4027720f402f1a35c36cb2d3bac71ce6142f99e2 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:35:58 +0200 Subject: [PATCH 06/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index eb62f2e89b..6f8281ff4c 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -207,7 +207,7 @@ impl RpcContextModule { } } -/// The sending end of registered subscription. +/// Used by the server to send data back to subscribers. #[derive(Clone)] pub struct SubscriptionSink { method: &'static str, From c4675870259e6ce9b565cc272ac2c0660f3cb312 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:36:14 +0200 Subject: [PATCH 07/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 6f8281ff4c..7d71f1e9b0 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -215,7 +215,7 @@ pub struct SubscriptionSink { } impl SubscriptionSink { - /// Send. + /// Send data back to subscribers. If a send fails (likely a broken connection) the subscriber is removed from the sink. O(n) in the number of subscribers. pub fn send(&mut self, result: &T) -> anyhow::Result<()> where T: Serialize, From 0d521f895cc4ce0824af5d421a897d6451fe3bfc Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:37:46 +0200 Subject: [PATCH 08/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 7d71f1e9b0..2a9dc87049 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -13,7 +13,7 @@ use serde::Serialize; use serde_json::value::to_raw_value; use std::sync::Arc; -/// Method. +/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, implemented as a function pointer to a `Fn` function taking four arguments: the `id`, `params`, a channel the function uses to communicate the result (or error) back to `jsonrpsee`, and the connection ID (useful for the websocket transport). pub type Method = Box anyhow::Result<()>>; /// Methods registered. pub type Methods = FxHashMap<&'static str, Method>; From a1d0e1c4607471a1b9200b1b8b3eb95f64ffa191 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:37:52 +0200 Subject: [PATCH 09/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 2a9dc87049..dfc08cc903 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -15,7 +15,7 @@ use std::sync::Arc; /// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, implemented as a function pointer to a `Fn` function taking four arguments: the `id`, `params`, a channel the function uses to communicate the result (or error) back to `jsonrpsee`, and the connection ID (useful for the websocket transport). pub type Method = Box anyhow::Result<()>>; -/// Methods registered. +/// A collection of registered [`Method`]s. pub type Methods = FxHashMap<&'static str, Method>; /// Connection ID. pub type ConnectionId = usize; From 0fbee55ab1ba893d3335ccf39bc5adf1347dad17 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:38:21 +0200 Subject: [PATCH 10/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index dfc08cc903..a099188499 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -141,7 +141,7 @@ impl RpcModule { self.methods } - /// Merge modules. + /// Merge two [`RpcModules`] by adding all [`Method`]s from `other` into `self`. Fails if any of the methods in `other` is present already. pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> { for name in other.methods.keys() { self.verify_method_name(name)?; From 80f9dc5d1e94117e11c3b1dacef6f2e44ab71da2 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 17 May 2021 21:38:29 +0200 Subject: [PATCH 11/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index a099188499..8bfcbe4651 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -17,7 +17,7 @@ use std::sync::Arc; pub type Method = Box anyhow::Result<()>>; /// A collection of registered [`Method`]s. pub type Methods = FxHashMap<&'static str, Method>; -/// Connection ID. +/// Connection ID. Set to `0` for http and to the connection ID for websockets. pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; From 548ee6ae8f04190443ffcfc315499ccd4035930a Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 10:19:41 +0200 Subject: [PATCH 12/20] address grumbles --- types/src/v2/params.rs | 4 ++++ utils/src/server/helpers.rs | 8 +++----- utils/src/server/mod.rs | 5 ----- utils/src/server/rpc_module.rs | 18 ++++++++++++------ 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index a97544078f..9e505662eb 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -191,3 +191,7 @@ impl Id { } } } + +/// Untyped JSON-RPC ID. +// TODO(niklasad1): this should be enforced to only accept: String, Number, or Null. +pub type JsonRpcRawId<'a> = Option<&'a serde_json::value::RawValue>; diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index 0882eceaee..397158beae 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -1,14 +1,12 @@ -use crate::server::{RpcId, RpcSender}; - use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject}; -use jsonrpsee_types::v2::params::TwoPointZero; +use jsonrpsee_types::v2::params::{JsonRpcRawId, TwoPointZero}; use jsonrpsee_types::v2::response::JsonRpcResponse; use serde::Serialize; /// Helper for sending JSON-RPC responses to the client -pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) { +pub fn send_response(id: JsonRpcRawId, tx: &mpsc::UnboundedSender, result: impl Serialize) { let json = match serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, id, result }) { Ok(json) => json, Err(err) => { @@ -24,7 +22,7 @@ pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) { } /// Helper for sending JSON-RPC errors to the client -pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) { +pub fn send_error(id: JsonRpcRawId, tx: &mpsc::UnboundedSender, error: JsonRpcErrorObject) { let json = match serde_json::to_string(&JsonRpcError { jsonrpc: TwoPointZero, error, id }) { Ok(json) => json, Err(err) => { diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs index b508fccd31..a809eaf6df 100644 --- a/utils/src/server/mod.rs +++ b/utils/src/server/mod.rs @@ -4,8 +4,3 @@ pub mod helpers; /// JSON-RPC "modules" groups sets of methods that belong together and handles method/subscription registration. pub mod rpc_module; - -/// Sender. -pub type RpcSender<'a> = &'a futures_channel::mpsc::UnboundedSender; -/// RPC ID. -pub type RpcId<'a> = Option<&'a serde_json::value::RawValue>; diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 8bfcbe4651..d2594226f5 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -1,10 +1,9 @@ use crate::server::helpers::{send_error, send_response}; -use crate::server::{RpcId, RpcSender}; use futures_channel::mpsc; use jsonrpsee_types::error::{CallError, Error}; use jsonrpsee_types::traits::RpcMethod; use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; -use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero}; +use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, JsonRpcRawId, RpcParams, TwoPointZero}; use jsonrpsee_types::v2::request::JsonRpcNotification; use parking_lot::Mutex; @@ -13,15 +12,22 @@ use serde::Serialize; use serde_json::value::to_raw_value; use std::sync::Arc; -/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, implemented as a function pointer to a `Fn` function taking four arguments: the `id`, `params`, a channel the function uses to communicate the result (or error) back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type Method = Box anyhow::Result<()>>; +/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, +/// implemented as a function pointer to a `Fn` function taking four arguments: +/// the `id`, `params`, a channel the function uses to communicate the result (or error) +/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). +pub type Method = Box anyhow::Result<()>>; /// A collection of registered [`Method`]s. pub type Methods = FxHashMap<&'static str, Method>; -/// Connection ID. Set to `0` for http and to the connection ID for websockets. +/// Connection ID, used for stateful protocol such as WebSockets. +/// For stateless protocols it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; -type Subscribers = Arc>>>; + +/// Sink that is used to send back the result to the server that registered this module. +type InnerSink<'a> = &'a mpsc::UnboundedSender; +type Subscribers = Arc>>; /// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] From 913b51733cce40b0d56e1c345d5038c41d28ba48 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 10:36:04 +0200 Subject: [PATCH 13/20] fix build --- http-server/src/server.rs | 5 ++--- utils/src/server/rpc_module.rs | 8 ++++---- ws-server/src/server.rs | 5 ++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index ff15c8e760..49c69ec684 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -38,8 +38,7 @@ use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; use jsonrpsee_utils::hyper_helpers::read_response_to_body; use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; -use jsonrpsee_utils::server::rpc_module::RpcModule; -use jsonrpsee_utils::server::RpcSender; +use jsonrpsee_utils::server::rpc_module::{RpcModule, MethodSink}; use serde::Serialize; use socket2::{Domain, Socket, Type}; @@ -159,7 +158,7 @@ impl Server { // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in // the params from the request. The result of the computation is sent back over the `tx` channel and // the result(s) are collected into a `String` and sent back over the wire. - let execute = move |tx: RpcSender, req: JsonRpcRequest| { + let execute = move |tx: &MethodSink, req: JsonRpcRequest| { if let Some(method) = methods.get(&*req.method) { let params = RpcParams::new(req.params.map(|params| params.get())); // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index d2594226f5..07d4440b41 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -16,7 +16,7 @@ use std::sync::Arc; /// implemented as a function pointer to a `Fn` function taking four arguments: /// the `id`, `params`, a channel the function uses to communicate the result (or error) /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). -pub type Method = Box anyhow::Result<()>>; +pub type Method = Box anyhow::Result<()>>; /// A collection of registered [`Method`]s. pub type Methods = FxHashMap<&'static str, Method>; /// Connection ID, used for stateful protocol such as WebSockets. @@ -24,10 +24,10 @@ pub type Methods = FxHashMap<&'static str, Method>; pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; +/// Sink that is used to send back the result to the server for a specific method. +pub type MethodSink = mpsc::UnboundedSender; -/// Sink that is used to send back the result to the server that registered this module. -type InnerSink<'a> = &'a mpsc::UnboundedSender; -type Subscribers = Arc>>; +type Subscribers = Arc>>; /// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 3f541f5001..614c08bad4 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -40,8 +40,7 @@ use jsonrpsee_types::v2::error::JsonRpcErrorCode; use jsonrpsee_types::v2::params::RpcParams; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; -use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods, RpcModule, SubscriptionSink}; -use jsonrpsee_utils::server::RpcSender; +use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods, RpcModule, SubscriptionSink, MethodSink}; pub struct Server { root: RpcModule, @@ -139,7 +138,7 @@ async fn background_task( // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in // the params from the request. The result of the computation is sent back over the `tx` channel and // the result(s) are collected into a `String` and sent back over the wire. - let execute = move |tx: RpcSender, req: JsonRpcRequest| { + let execute = move |tx: &MethodSink, req: JsonRpcRequest| { if let Some(method) = methods.get(&*req.method) { let params = RpcParams::new(req.params.map(|params| params.get())); if let Err(err) = (method)(req.id, params, &tx, conn_id) { From 302a4f7d2911098caba860fee515c8a5dc380e1e Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 10:48:26 +0200 Subject: [PATCH 14/20] fix docs --- utils/src/server/rpc_module.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 07d4440b41..b3db1924d9 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -29,7 +29,8 @@ pub type MethodSink = mpsc::UnboundedSender; type Subscribers = Arc>>; -/// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, alternatively, merged with other modules to construct a cohesive API. +/// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, +/// alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] pub struct RpcModule { methods: Methods, @@ -147,7 +148,8 @@ impl RpcModule { self.methods } - /// Merge two [`RpcModules`] by adding all [`Method`]s from `other` into `self`. Fails if any of the methods in `other` is present already. + /// Merge two [`RpcModule`]'s by adding all [`Method`]s from `other` into `self`. + /// Fails if any of the methods in `other` is present already. pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> { for name in other.methods.keys() { self.verify_method_name(name)?; @@ -221,7 +223,9 @@ pub struct SubscriptionSink { } impl SubscriptionSink { - /// Send data back to subscribers. If a send fails (likely a broken connection) the subscriber is removed from the sink. O(n) in the number of subscribers. + /// Send data back to subscribers. + /// If a send fails (likely a broken connection) the subscriber is removed from the sink. + /// O(n) in the number of subscribers. pub fn send(&mut self, result: &T) -> anyhow::Result<()> where T: Serialize, From eb60f4a39d71280ba5643f8f1ae589a28e1a55a9 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 10:56:22 +0200 Subject: [PATCH 15/20] cargo fmt --- http-server/src/server.rs | 2 +- ws-server/src/server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 49c69ec684..12f3038354 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -38,7 +38,7 @@ use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; use jsonrpsee_utils::hyper_helpers::read_response_to_body; use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; -use jsonrpsee_utils::server::rpc_module::{RpcModule, MethodSink}; +use jsonrpsee_utils::server::rpc_module::{MethodSink, RpcModule}; use serde::Serialize; use socket2::{Domain, Socket, Type}; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 614c08bad4..61c0b09048 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -40,7 +40,7 @@ use jsonrpsee_types::v2::error::JsonRpcErrorCode; use jsonrpsee_types::v2::params::RpcParams; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; -use jsonrpsee_utils::server::rpc_module::{ConnectionId, Methods, RpcModule, SubscriptionSink, MethodSink}; +use jsonrpsee_utils::server::rpc_module::{ConnectionId, MethodSink, Methods, RpcModule, SubscriptionSink}; pub struct Server { root: RpcModule, From 55f13a29fa1579aef3cb93d2cd9594a489464531 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 11:45:55 +0200 Subject: [PATCH 16/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index b3db1924d9..4d114595db 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -163,7 +163,7 @@ impl RpcModule { } } -/// Similar to [`RpcModule`] but it wraps an additional context argument that can be used +/// Similar to [`RpcModule`] but wraps an additional context argument that can be used /// to access data during call execution. pub struct RpcContextModule { ctx: Arc, From 19cf90a6d7c5032cfcf28e99f939b757af3e64fd Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 11:51:43 +0200 Subject: [PATCH 17/20] grumbles: use MethodSink --- utils/src/server/helpers.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/src/server/helpers.rs b/utils/src/server/helpers.rs index 397158beae..83719aac3c 100644 --- a/utils/src/server/helpers.rs +++ b/utils/src/server/helpers.rs @@ -1,3 +1,4 @@ +use crate::server::rpc_module::MethodSink; use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject}; @@ -6,7 +7,7 @@ use jsonrpsee_types::v2::response::JsonRpcResponse; use serde::Serialize; /// Helper for sending JSON-RPC responses to the client -pub fn send_response(id: JsonRpcRawId, tx: &mpsc::UnboundedSender, result: impl Serialize) { +pub fn send_response(id: JsonRpcRawId, tx: &MethodSink, result: impl Serialize) { let json = match serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, id, result }) { Ok(json) => json, Err(err) => { @@ -22,7 +23,7 @@ pub fn send_response(id: JsonRpcRawId, tx: &mpsc::UnboundedSender, resul } /// Helper for sending JSON-RPC errors to the client -pub fn send_error(id: JsonRpcRawId, tx: &mpsc::UnboundedSender, error: JsonRpcErrorObject) { +pub fn send_error(id: JsonRpcRawId, tx: &MethodSink, error: JsonRpcErrorObject) { let json = match serde_json::to_string(&JsonRpcError { jsonrpc: TwoPointZero, error, id }) { Ok(json) => json, Err(err) => { From fbcbc63a8676cf28050575a7d771e0180da4b6a8 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 11:53:15 +0200 Subject: [PATCH 18/20] Update utils/src/server/mod.rs Co-authored-by: David --- utils/src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs index a809eaf6df..0f870abd48 100644 --- a/utils/src/server/mod.rs +++ b/utils/src/server/mod.rs @@ -2,5 +2,5 @@ /// Helpers. pub mod helpers; -/// JSON-RPC "modules" groups sets of methods that belong together and handles method/subscription registration. +/// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration. pub mod rpc_module; From c3c2d29d5f66bf6c0128fe6d3aefb79891bc3027 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 11:53:25 +0200 Subject: [PATCH 19/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index 4d114595db..bf76cdfa3a 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -20,7 +20,7 @@ pub type Method = Box; /// Connection ID, used for stateful protocol such as WebSockets. -/// For stateless protocols it's unused, so feel free to set it some hardcoded value. +/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. pub type ConnectionId = usize; /// Subscription ID. pub type SubscriptionId = u64; From e41ac178b7ff715025a6b3c94734b0c9a6af5ef4 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 18 May 2021 11:53:42 +0200 Subject: [PATCH 20/20] Update utils/src/server/rpc_module.rs Co-authored-by: David --- utils/src/server/rpc_module.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/server/rpc_module.rs b/utils/src/server/rpc_module.rs index bf76cdfa3a..ad0fe323d1 100644 --- a/utils/src/server/rpc_module.rs +++ b/utils/src/server/rpc_module.rs @@ -29,7 +29,7 @@ pub type MethodSink = mpsc::UnboundedSender; type Subscribers = Arc>>; -/// Sets of JSON-RPC methods can be organized into "module" that are in turn registered on server or, +/// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or, /// alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] pub struct RpcModule {