diff --git a/http-server/src/lib.rs b/http-server/src/lib.rs index ea792bc4bf..e52eecfb3a 100644 --- a/http-server/src/lib.rs +++ b/http-server/src/lib.rs @@ -25,13 +25,12 @@ // DEALINGS IN THE SOFTWARE. mod access_control; -mod module; mod response; mod server; pub use access_control::{AccessControl, AccessControlBuilder, AllowHosts, Host}; pub use jsonrpsee_types::{Error, TEN_MB_SIZE_BYTES}; -pub use module::{RpcContextModule, RpcModule}; +pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule}; pub use server::{Builder as HttpServerBuilder, Server as HttpServer}; #[cfg(test)] diff --git a/http-server/src/module.rs b/http-server/src/module.rs deleted file mode 100644 index 0e9ad58e8e..0000000000 --- a/http-server/src/module.rs +++ /dev/null @@ -1,132 +0,0 @@ -use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; -use jsonrpsee_types::{ - error::{CallError, Error}, - traits::RpcMethod, - v2::params::RpcParams, -}; -use jsonrpsee_utils::server::{send_error, send_response, Methods}; -use serde::Serialize; -use std::sync::Arc; - -#[derive(Default)] -pub struct RpcModule { - methods: Methods, -} - -impl RpcModule { - /// Instantiate a new `RpcModule`. - pub fn new() -> Self { - RpcModule { methods: Methods::default() } - } - - /// Add context for this module, turning it into an `RpcContextModule`. - pub fn with_context(self, ctx: Context) -> RpcContextModule { - RpcContextModule { ctx: Arc::new(ctx), module: self } - } - - fn verify_method_name(&mut self, name: &str) -> Result<(), Error> { - if self.methods.get(name).is_some() { - return Err(Error::MethodAlreadyRegistered(name.into())); - } - - Ok(()) - } - - /// Register a new RPC method, which responds with a given callback. - pub fn register_method(&mut self, method_name: &'static str, callback: F) -> Result<(), Error> - where - R: Serialize, - F: RpcMethod, - { - self.verify_method_name(method_name)?; - - self.methods.insert( - method_name, - Box::new(move |id, params, tx, _| { - match callback(params) { - Ok(res) => send_response(id, tx, res), - Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()), - Err(CallError::Failed(err)) => { - log::error!("Call failed with: {}", err); - let err = JsonRpcErrorObject { - code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), - message: &err.to_string(), - data: None, - }; - send_error(id, tx, err) - } - }; - - Ok(()) - }), - ); - - Ok(()) - } - - pub(crate) fn into_methods(self) -> Methods { - self.methods - } - - pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> { - for name in other.methods.keys() { - self.verify_method_name(name)?; - } - - for (name, callback) in other.methods { - self.methods.insert(name, callback); - } - - Ok(()) - } -} - -pub struct RpcContextModule { - ctx: Arc, - module: RpcModule, -} - -impl RpcContextModule { - /// Create a new module with a given shared `Context`. - pub fn new(ctx: Context) -> Self { - RpcContextModule { ctx: Arc::new(ctx), module: RpcModule::new() } - } - - /// Register a new RPC method, which responds with a given callback. - pub fn register_method(&mut self, method_name: &'static str, callback: F) -> Result<(), Error> - where - Context: Send + Sync + 'static, - R: Serialize, - F: Fn(RpcParams, &Context) -> Result + Send + Sync + 'static, - { - self.module.verify_method_name(method_name)?; - - let ctx = self.ctx.clone(); - - self.module.methods.insert( - method_name, - Box::new(move |id, params, tx, _| { - match callback(params, &*ctx) { - Ok(res) => send_response(id, tx, res), - Err(CallError::InvalidParams) => send_error(id, tx, JsonRpcErrorCode::InvalidParams.into()), - Err(CallError::Failed(err)) => { - let err = JsonRpcErrorObject { - code: JsonRpcErrorCode::ServerError(CALL_EXECUTION_FAILED_CODE), - message: &err.to_string(), - data: None, - }; - send_error(id, tx, err) - } - }; - Ok(()) - }), - ); - - Ok(()) - } - - /// Convert this `RpcContextModule` into a regular `RpcModule` that can be registered on the `Server`. - pub fn into_module(self) -> RpcModule { - self.module - } -} diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 8de5ee8ed0..12f3038354 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -24,10 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::module::RpcModule; -use crate::response; -use crate::AccessControl; -use crate::TEN_MB_SIZE_BYTES; +use crate::{response, AccessControl, TEN_MB_SIZE_BYTES}; use anyhow::anyhow; use futures_channel::mpsc; use futures_util::stream::StreamExt; @@ -39,10 +36,10 @@ use hyper::{ use jsonrpsee_types::error::{CallError, Error, GenericTransportError}; use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams}; -use jsonrpsee_utils::{ - hyper_helpers::read_response_to_body, - server::{collect_batch_response, send_error, RpcSender}, -}; +use jsonrpsee_utils::hyper_helpers::read_response_to_body; +use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; +use jsonrpsee_utils::server::rpc_module::{MethodSink, RpcModule}; + use serde::Serialize; use socket2::{Domain, Socket, Type}; use std::{ @@ -161,7 +158,7 @@ impl Server { // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in // the params from the request. The result of the computation is sent back over the `tx` channel and // the result(s) are collected into a `String` and sent back over the wire. - let execute = move |tx: RpcSender, req: JsonRpcRequest| { + let execute = move |tx: &MethodSink, req: JsonRpcRequest| { if let Some(method) = methods.get(&*req.method) { let params = RpcParams::new(req.params.map(|params| params.get())); // NOTE(niklasad1): connection ID is unused thus hardcoded to `0`. diff --git a/types/src/v2/params.rs b/types/src/v2/params.rs index a97544078f..9e505662eb 100644 --- a/types/src/v2/params.rs +++ b/types/src/v2/params.rs @@ -191,3 +191,7 @@ impl Id { } } } + +/// Untyped JSON-RPC ID. +// TODO(niklasad1): this should be enforced to only accept: String, Number, or Null. +pub type JsonRpcRawId<'a> = Option<&'a serde_json::value::RawValue>; diff --git a/utils/Cargo.toml b/utils/Cargo.toml index f70a505e34..2554964ab7 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -15,14 +15,27 @@ hyper14 = { package = "hyper", version = "0.14", default-features = false, featu jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6", optional = true } log = { version = "0.4", optional = true } rustc-hash = { version = "1", optional = true } +rand = { version = "0.8", optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1", features = ["raw_value"], optional = true } +parking_lot = { version = "0.11", optional = true } [features] default = [] hyper_13 = ["hyper13", "futures-util", "jsonrpsee-types"] hyper_14 = ["hyper14", "futures-util", "jsonrpsee-types"] -server = ["anyhow", "futures-channel", "futures-util", "jsonrpsee-types", "rustc-hash", "serde", "serde_json", "log"] +server = [ + "anyhow", + "futures-channel", + "futures-util", + "jsonrpsee-types", + "rustc-hash", + "serde", + "serde_json", + "log", + "parking_lot", + "rand" +] [dev-dependencies] serde_json = "1.0" diff --git a/utils/src/server.rs b/utils/src/server/helpers.rs similarity index 68% rename from utils/src/server.rs rename to utils/src/server/helpers.rs index 1f36c49dee..83719aac3c 100644 --- a/utils/src/server.rs +++ b/utils/src/server/helpers.rs @@ -1,27 +1,13 @@ -//! Shared helpers for JSON-RPC Servers. - +use crate::server::rpc_module::MethodSink; use futures_channel::mpsc; use futures_util::stream::StreamExt; use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject}; -use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero}; +use jsonrpsee_types::v2::params::{JsonRpcRawId, TwoPointZero}; use jsonrpsee_types::v2::response::JsonRpcResponse; -use rustc_hash::FxHashMap; use serde::Serialize; -use serde_json::value::RawValue; - -/// Connection ID. -pub type ConnectionId = usize; -/// Sender. -pub type RpcSender<'a> = &'a mpsc::UnboundedSender; -/// RPC ID. -pub type RpcId<'a> = Option<&'a RawValue>; -/// Method registered in the server. -pub type Method = Box anyhow::Result<()>>; -/// Methods registered in the Server. -pub type Methods = FxHashMap<&'static str, Method>; /// Helper for sending JSON-RPC responses to the client -pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) { +pub fn send_response(id: JsonRpcRawId, tx: &MethodSink, result: impl Serialize) { let json = match serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, id, result }) { Ok(json) => json, Err(err) => { @@ -37,7 +23,7 @@ pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) { } /// Helper for sending JSON-RPC errors to the client -pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) { +pub fn send_error(id: JsonRpcRawId, tx: &MethodSink, error: JsonRpcErrorObject) { let json = match serde_json::to_string(&JsonRpcError { jsonrpc: TwoPointZero, error, id }) { Ok(json) => json, Err(err) => { diff --git a/utils/src/server/mod.rs b/utils/src/server/mod.rs new file mode 100644 index 0000000000..0f870abd48 --- /dev/null +++ b/utils/src/server/mod.rs @@ -0,0 +1,6 @@ +//! Shared modules for the JSON-RPC servers. + +/// Helpers. +pub mod helpers; +/// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration. +pub mod rpc_module; diff --git a/ws-server/src/server/module.rs b/utils/src/server/rpc_module.rs similarity index 59% rename from ws-server/src/server/module.rs rename to utils/src/server/rpc_module.rs index 540117a579..ad0fe323d1 100644 --- a/ws-server/src/server/module.rs +++ b/utils/src/server/rpc_module.rs @@ -1,15 +1,36 @@ -use crate::server::{RpcParams, SubscriptionId, SubscriptionSink}; -use jsonrpsee_types::{ - error::{CallError, Error}, - v2::error::{JsonRpcErrorCode, JsonRpcErrorObject}, -}; -use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE}; -use jsonrpsee_utils::server::{send_error, send_response, Methods}; +use crate::server::helpers::{send_error, send_response}; +use futures_channel::mpsc; +use jsonrpsee_types::error::{CallError, Error}; +use jsonrpsee_types::traits::RpcMethod; +use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE}; +use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, JsonRpcRawId, RpcParams, TwoPointZero}; +use jsonrpsee_types::v2::request::JsonRpcNotification; + use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::Serialize; +use serde_json::value::to_raw_value; use std::sync::Arc; +/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request, +/// implemented as a function pointer to a `Fn` function taking four arguments: +/// the `id`, `params`, a channel the function uses to communicate the result (or error) +/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). +pub type Method = Box anyhow::Result<()>>; +/// A collection of registered [`Method`]s. +pub type Methods = FxHashMap<&'static str, Method>; +/// Connection ID, used for stateful protocol such as WebSockets. +/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. +pub type ConnectionId = usize; +/// Subscription ID. +pub type SubscriptionId = u64; +/// Sink that is used to send back the result to the server for a specific method. +pub type MethodSink = mpsc::UnboundedSender; + +type Subscribers = Arc>>; + +/// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or, +/// alternatively, merged with other modules to construct a cohesive API. #[derive(Default)] pub struct RpcModule { methods: Methods, @@ -122,11 +143,14 @@ impl RpcModule { Ok(SubscriptionSink { method: subscribe_method_name, subscribers }) } - pub(crate) fn into_methods(self) -> Methods { + /// Convert a module into methods. + pub fn into_methods(self) -> Methods { self.methods } - pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> { + /// Merge two [`RpcModule`]'s by adding all [`Method`]s from `other` into `self`. + /// Fails if any of the methods in `other` is present already. + pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> { for name in other.methods.keys() { self.verify_method_name(name)?; } @@ -139,6 +163,8 @@ impl RpcModule { } } +/// Similar to [`RpcModule`] but wraps an additional context argument that can be used +/// to access data during call execution. pub struct RpcContextModule { ctx: Arc, module: RpcModule, @@ -188,3 +214,44 @@ impl RpcContextModule { self.module } } + +/// Used by the server to send data back to subscribers. +#[derive(Clone)] +pub struct SubscriptionSink { + method: &'static str, + subscribers: Subscribers, +} + +impl SubscriptionSink { + /// Send data back to subscribers. + /// If a send fails (likely a broken connection) the subscriber is removed from the sink. + /// O(n) in the number of subscribers. + pub fn send(&mut self, result: &T) -> anyhow::Result<()> + where + T: Serialize, + { + let result = to_raw_value(result)?; + + let mut errored = Vec::new(); + let mut subs = self.subscribers.lock(); + + for ((conn_id, sub_id), sender) in subs.iter() { + let msg = serde_json::to_string(&JsonRpcNotification { + jsonrpc: TwoPointZero, + method: self.method, + params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, + })?; + + // Track broken connections + if sender.unbounded_send(msg).is_err() { + errored.push((*conn_id, *sub_id)); + } + } + + // Remove broken connections + for entry in errored { + subs.remove(&entry); + } + Ok(()) + } +} diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index 12d2ca058a..e48acd8e38 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -16,8 +16,6 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] } jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6" } jsonrpsee-utils = { path = "../utils", version = "0.2.0-alpha.6", features = ["server"] } log = "0.4" -parking_lot = "0.11" -rand = "0.8" rustc-hash = "1.1.0" serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } diff --git a/ws-server/src/lib.rs b/ws-server/src/lib.rs index a589f137b6..dee92502f4 100644 --- a/ws-server/src/lib.rs +++ b/ws-server/src/lib.rs @@ -32,4 +32,5 @@ mod server; mod tests; pub use jsonrpsee_types::error::Error; -pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink}; +pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule, SubscriptionSink}; +pub use server::Server as WsServer; diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 024f58931c..61c0b09048 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -27,10 +27,7 @@ use futures_channel::mpsc; use futures_util::io::{BufReader, BufWriter}; use futures_util::stream::StreamExt; -use parking_lot::Mutex; -use rustc_hash::FxHashMap; use serde::Serialize; -use serde_json::value::to_raw_value; use soketto::handshake::{server::Response, Server as SokettoServer}; use std::net::SocketAddr; use std::sync::Arc; @@ -40,54 +37,10 @@ use tokio_util::compat::TokioAsyncReadCompatExt; use jsonrpsee_types::error::{CallError, Error}; use jsonrpsee_types::v2::error::JsonRpcErrorCode; -use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, RpcParams, TwoPointZero}; -use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcNotification, JsonRpcRequest}; -use jsonrpsee_utils::server::{collect_batch_response, send_error, ConnectionId, Methods, RpcSender}; - -mod module; - -pub use module::{RpcContextModule, RpcModule}; - -type SubscriptionId = u64; -type Subscribers = Arc>>>; - -#[derive(Clone)] -pub struct SubscriptionSink { - method: &'static str, - subscribers: Subscribers, -} - -impl SubscriptionSink { - pub fn send(&mut self, result: &T) -> anyhow::Result<()> - where - T: Serialize, - { - let result = to_raw_value(result)?; - - let mut errored = Vec::new(); - let mut subs = self.subscribers.lock(); - - for ((conn_id, sub_id), sender) in subs.iter() { - let msg = serde_json::to_string(&JsonRpcNotification { - jsonrpc: TwoPointZero, - method: self.method, - params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result }, - })?; - - // Log broken connections - if sender.unbounded_send(msg).is_err() { - errored.push((*conn_id, *sub_id)); - } - } - - // Remove broken connections - for entry in errored { - subs.remove(&entry); - } - - Ok(()) - } -} +use jsonrpsee_types::v2::params::RpcParams; +use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest}; +use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error}; +use jsonrpsee_utils::server::rpc_module::{ConnectionId, MethodSink, Methods, RpcModule, SubscriptionSink}; pub struct Server { root: RpcModule, @@ -185,7 +138,7 @@ async fn background_task( // Look up the "method" (i.e. function pointer) from the registered methods and run it passing in // the params from the request. The result of the computation is sent back over the `tx` channel and // the result(s) are collected into a `String` and sent back over the wire. - let execute = move |tx: RpcSender, req: JsonRpcRequest| { + let execute = move |tx: &MethodSink, req: JsonRpcRequest| { if let Some(method) = methods.get(&*req.method) { let params = RpcParams::new(req.params.map(|params| params.get())); if let Err(err) = (method)(req.id, params, &tx, conn_id) {