Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[servers] extract rpc modules to utils #322

Merged
merged 21 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions http-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
// DEALINGS IN THE SOFTWARE.

mod access_control;
mod module;
mod response;
mod server;

pub use access_control::{AccessControl, AccessControlBuilder, AllowHosts, Host};
pub use jsonrpsee_types::{Error, TEN_MB_SIZE_BYTES};
pub use module::{RpcContextModule, RpcModule};
pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule};
pub use server::{Builder as HttpServerBuilder, Server as HttpServer};

#[cfg(test)]
Expand Down
132 changes: 0 additions & 132 deletions http-server/src/module.rs

This file was deleted.

15 changes: 6 additions & 9 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::module::RpcModule;
use crate::response;
use crate::AccessControl;
use crate::TEN_MB_SIZE_BYTES;
use crate::{response, AccessControl, TEN_MB_SIZE_BYTES};
use anyhow::anyhow;
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
Expand All @@ -39,10 +36,10 @@ use hyper::{
use jsonrpsee_types::error::{CallError, Error, GenericTransportError};
use jsonrpsee_types::v2::request::{JsonRpcInvalidRequest, JsonRpcRequest};
use jsonrpsee_types::v2::{error::JsonRpcErrorCode, params::RpcParams};
use jsonrpsee_utils::{
hyper_helpers::read_response_to_body,
server::{collect_batch_response, send_error, RpcSender},
};
use jsonrpsee_utils::hyper_helpers::read_response_to_body;
use jsonrpsee_utils::server::helpers::{collect_batch_response, send_error};
use jsonrpsee_utils::server::rpc_module::{MethodSink, RpcModule};

use serde::Serialize;
use socket2::{Domain, Socket, Type};
use std::{
Expand Down Expand Up @@ -161,7 +158,7 @@ impl Server {
// Look up the "method" (i.e. function pointer) from the registered methods and run it passing in
// the params from the request. The result of the computation is sent back over the `tx` channel and
// the result(s) are collected into a `String` and sent back over the wire.
let execute = move |tx: RpcSender, req: JsonRpcRequest| {
let execute = move |tx: &MethodSink, req: JsonRpcRequest| {
if let Some(method) = methods.get(&*req.method) {
let params = RpcParams::new(req.params.map(|params| params.get()));
// NOTE(niklasad1): connection ID is unused thus hardcoded to `0`.
Expand Down
4 changes: 4 additions & 0 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,7 @@ impl Id {
}
}
}

/// Untyped JSON-RPC ID.
// TODO(niklasad1): this should be enforced to only accept: String, Number, or Null.
pub type JsonRpcRawId<'a> = Option<&'a serde_json::value::RawValue>;
15 changes: 14 additions & 1 deletion utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,27 @@ hyper14 = { package = "hyper", version = "0.14", default-features = false, featu
jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6", optional = true }
log = { version = "0.4", optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1", features = ["raw_value"], optional = true }
parking_lot = { version = "0.11", optional = true }

[features]
default = []
hyper_13 = ["hyper13", "futures-util", "jsonrpsee-types"]
hyper_14 = ["hyper14", "futures-util", "jsonrpsee-types"]
server = ["anyhow", "futures-channel", "futures-util", "jsonrpsee-types", "rustc-hash", "serde", "serde_json", "log"]
server = [
"anyhow",
"futures-channel",
"futures-util",
"jsonrpsee-types",
"rustc-hash",
"serde",
"serde_json",
"log",
"parking_lot",
"rand"
]

[dev-dependencies]
serde_json = "1.0"
Expand Down
22 changes: 4 additions & 18 deletions utils/src/server.rs → utils/src/server/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
//! Shared helpers for JSON-RPC Servers.

use crate::server::rpc_module::MethodSink;
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use jsonrpsee_types::v2::error::{JsonRpcError, JsonRpcErrorCode, JsonRpcErrorObject};
use jsonrpsee_types::v2::params::{RpcParams, TwoPointZero};
use jsonrpsee_types::v2::params::{JsonRpcRawId, TwoPointZero};
use jsonrpsee_types::v2::response::JsonRpcResponse;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::RawValue;

/// Connection ID.
pub type ConnectionId = usize;
/// Sender.
pub type RpcSender<'a> = &'a mpsc::UnboundedSender<String>;
/// RPC ID.
pub type RpcId<'a> = Option<&'a RawValue>;
/// Method registered in the server.
pub type Method = Box<dyn Send + Sync + Fn(RpcId, RpcParams, RpcSender, ConnectionId) -> anyhow::Result<()>>;
/// Methods registered in the Server.
pub type Methods = FxHashMap<&'static str, Method>;

/// Helper for sending JSON-RPC responses to the client
pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) {
pub fn send_response(id: JsonRpcRawId, tx: &MethodSink, result: impl Serialize) {
let json = match serde_json::to_string(&JsonRpcResponse { jsonrpc: TwoPointZero, id, result }) {
Ok(json) => json,
Err(err) => {
Expand All @@ -37,7 +23,7 @@ pub fn send_response(id: RpcId, tx: RpcSender, result: impl Serialize) {
}

/// Helper for sending JSON-RPC errors to the client
pub fn send_error(id: RpcId, tx: RpcSender, error: JsonRpcErrorObject) {
pub fn send_error(id: JsonRpcRawId, tx: &MethodSink, error: JsonRpcErrorObject) {
let json = match serde_json::to_string(&JsonRpcError { jsonrpc: TwoPointZero, error, id }) {
Ok(json) => json,
Err(err) => {
Expand Down
6 changes: 6 additions & 0 deletions utils/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Shared modules for the JSON-RPC servers.

/// Helpers.
pub mod helpers;
/// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration.
pub mod rpc_module;
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
use crate::server::{RpcParams, SubscriptionId, SubscriptionSink};
use jsonrpsee_types::{
error::{CallError, Error},
v2::error::{JsonRpcErrorCode, JsonRpcErrorObject},
};
use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_utils::server::{send_error, send_response, Methods};
use crate::server::helpers::{send_error, send_response};
use futures_channel::mpsc;
use jsonrpsee_types::error::{CallError, Error};
use jsonrpsee_types::traits::RpcMethod;
use jsonrpsee_types::v2::error::{JsonRpcErrorCode, JsonRpcErrorObject, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::v2::params::{JsonRpcNotificationParams, JsonRpcRawId, RpcParams, TwoPointZero};
use jsonrpsee_types::v2::request::JsonRpcNotification;

use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde_json::value::to_raw_value;
use std::sync::Arc;

/// A `Method` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type Method = Box<dyn Send + Sync + Fn(JsonRpcRawId, RpcParams, &MethodSink, ConnectionId) -> anyhow::Result<()>>;
/// A collection of registered [`Method`]s.
pub type Methods = FxHashMap<&'static str, Method>;
/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Subscription ID.
pub type SubscriptionId = u64;
/// Sink that is used to send back the result to the server for a specific method.
pub type MethodSink = mpsc::UnboundedSender<String>;

type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), MethodSink>>>;

/// Sets of JSON-RPC methods can be organized into a "module" that are in turn registered on server or,
/// alternatively, merged with other modules to construct a cohesive API.
#[derive(Default)]
pub struct RpcModule {
methods: Methods,
Expand Down Expand Up @@ -122,11 +143,14 @@ impl RpcModule {
Ok(SubscriptionSink { method: subscribe_method_name, subscribers })
}

pub(crate) fn into_methods(self) -> Methods {
/// Convert a module into methods.
pub fn into_methods(self) -> Methods {
self.methods
}

pub(crate) fn merge(&mut self, other: RpcModule) -> Result<(), Error> {
/// Merge two [`RpcModule`]'s by adding all [`Method`]s from `other` into `self`.
/// Fails if any of the methods in `other` is present already.
pub fn merge(&mut self, other: RpcModule) -> Result<(), Error> {
for name in other.methods.keys() {
self.verify_method_name(name)?;
}
Expand All @@ -139,6 +163,8 @@ impl RpcModule {
}
}

/// Similar to [`RpcModule`] but wraps an additional context argument that can be used
/// to access data during call execution.
pub struct RpcContextModule<Context> {
ctx: Arc<Context>,
module: RpcModule,
Expand Down Expand Up @@ -188,3 +214,44 @@ impl<Context> RpcContextModule<Context> {
self.module
}
}

/// Used by the server to send data back to subscribers.
#[derive(Clone)]
pub struct SubscriptionSink {
method: &'static str,
subscribers: Subscribers,
}

impl SubscriptionSink {
/// Send data back to subscribers.
/// If a send fails (likely a broken connection) the subscriber is removed from the sink.
/// O(n) in the number of subscribers.
pub fn send<T>(&mut self, result: &T) -> anyhow::Result<()>
where
T: Serialize,
{
let result = to_raw_value(result)?;

let mut errored = Vec::new();
let mut subs = self.subscribers.lock();

for ((conn_id, sub_id), sender) in subs.iter() {
let msg = serde_json::to_string(&JsonRpcNotification {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: *sub_id, result: &*result },
})?;

// Track broken connections
if sender.unbounded_send(msg).is_err() {
errored.push((*conn_id, *sub_id));
}
}

// Remove broken connections
for entry in errored {
subs.remove(&entry);
}
Ok(())
}
}
2 changes: 0 additions & 2 deletions ws-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ futures-util = { version = "0.3", default-features = false, features = ["io"] }
jsonrpsee-types = { path = "../types", version = "0.2.0-alpha.6" }
jsonrpsee-utils = { path = "../utils", version = "0.2.0-alpha.6", features = ["server"] }
log = "0.4"
parking_lot = "0.11"
rand = "0.8"
rustc-hash = "1.1.0"
serde = { version = "1", default-features = false, features = ["derive"] }
serde_json = { version = "1", features = ["raw_value"] }
Expand Down
3 changes: 2 additions & 1 deletion ws-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ mod server;
mod tests;

pub use jsonrpsee_types::error::Error;
pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink};
pub use jsonrpsee_utils::server::rpc_module::{Methods, RpcContextModule, RpcModule, SubscriptionSink};
pub use server::Server as WsServer;
Loading