From 33ad7783c5179f1642b4d2ae9f1b3e9ad15acb07 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 15 Jan 2025 09:48:05 +0100 Subject: [PATCH 1/6] feat: client rpc middleware --- client/http-client/Cargo.toml | 1 + client/http-client/src/client.rs | 118 +++++++++++++------------- client/http-client/src/lib.rs | 4 + client/http-client/src/rpc_service.rs | 59 +++++++++++++ client/http-client/src/transport.rs | 20 ++++- core/Cargo.toml | 5 +- core/src/lib.rs | 4 + core/src/middleware.rs | 78 +++++++++++++++++ core/src/traits.rs | 2 + examples/examples/http.rs | 22 ++++- server/src/middleware/rpc/mod.rs | 79 +---------------- 11 files changed, 252 insertions(+), 140 deletions(-) create mode 100644 client/http-client/src/rpc_service.rs create mode 100644 core/src/middleware.rs diff --git a/client/http-client/Cargo.toml b/client/http-client/Cargo.toml index 3f668d6d5e..17bf55d8d9 100644 --- a/client/http-client/Cargo.toml +++ b/client/http-client/Cargo.toml @@ -16,6 +16,7 @@ publish = true [dependencies] async-trait = { workspace = true } base64 = { workspace = true } +futures-util = { workspace = true } hyper = { workspace = true, features = ["client", "http1", "http2"] } hyper-rustls = { workspace = true, features = ["http1", "http2", "tls12", "logging", "ring"], optional = true } hyper-util = { workspace = true, features = ["client", "client-legacy", "tokio", "http1", "http2"] } diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 52a9a8ca21..3c13569408 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -24,24 +24,25 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::borrow::Cow as StdCow; use std::fmt; use std::sync::Arc; use std::time::Duration; -use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClient, HttpTransportClientBuilder}; -use crate::types::{NotificationSer, RequestSer, Response}; -use crate::{HttpRequest, HttpResponse}; +use crate::rpc_service::RpcService; +use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClientBuilder}; +use crate::types::Response; +use crate::{HttpRequest, HttpResponse, IsNotification}; use async_trait::async_trait; use hyper::body::Bytes; use hyper::http::HeaderMap; use jsonrpsee_core::client::{ - generate_batch_id_range, BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, + BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, }; +use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::params::BatchRequestBuilder; use jsonrpsee_core::traits::ToRpcParams; use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES}; -use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero}; +use jsonrpsee_types::{Id, InvalidRequestId, Request, ResponseSuccess}; use serde::de::DeserializeOwned; use tokio::sync::Semaphore; use tower::layer::util::Identity; @@ -75,7 +76,7 @@ use crate::{CertificateStore, CustomCertStore}; /// } /// ``` #[derive(Clone, Debug)] -pub struct HttpClientBuilder { +pub struct HttpClientBuilder { max_request_size: u32, max_response_size: u32, request_timeout: Duration, @@ -84,12 +85,13 @@ pub struct HttpClientBuilder { id_kind: IdKind, max_log_length: u32, headers: HeaderMap, - service_builder: tower::ServiceBuilder, + service_builder: tower::ServiceBuilder, + rpc_middleware: RpcServiceBuilder, tcp_no_delay: bool, max_concurrent_requests: Option, } -impl HttpClientBuilder { +impl HttpClientBuilder { /// Set the maximum size of a request body in bytes. Default is 10 MiB. pub fn max_request_size(mut self, size: u32) -> Self { self.max_request_size = size; @@ -215,8 +217,29 @@ impl HttpClientBuilder { self } + /// Set the RPC middleware. + pub fn set_rpc_middleware(self, rpc_builder: RpcServiceBuilder) -> HttpClientBuilder { + HttpClientBuilder { + #[cfg(feature = "tls")] + certificate_store: self.certificate_store, + id_kind: self.id_kind, + headers: self.headers, + max_log_length: self.max_log_length, + max_request_size: self.max_request_size, + max_response_size: self.max_response_size, + service_builder: self.service_builder, + rpc_middleware: rpc_builder, + request_timeout: self.request_timeout, + tcp_no_delay: self.tcp_no_delay, + max_concurrent_requests: self.max_concurrent_requests, + } + } + /// Set custom tower middleware. - pub fn set_http_middleware(self, service_builder: tower::ServiceBuilder) -> HttpClientBuilder { + pub fn set_http_middleware( + self, + service_builder: tower::ServiceBuilder, + ) -> HttpClientBuilder { HttpClientBuilder { #[cfg(feature = "tls")] certificate_store: self.certificate_store, @@ -226,6 +249,7 @@ impl HttpClientBuilder { max_request_size: self.max_request_size, max_response_size: self.max_response_size, service_builder, + rpc_middleware: self.rpc_middleware, request_timeout: self.request_timeout, tcp_no_delay: self.tcp_no_delay, max_concurrent_requests: self.max_concurrent_requests, @@ -233,16 +257,18 @@ impl HttpClientBuilder { } } -impl HttpClientBuilder +impl HttpClientBuilder where - L: Layer, + RpcMiddleware: Layer, Service = S2>, + for<'a> >>::Service: RpcServiceT<'a>, + HttpMiddleware: Layer, S: Service, Error = TransportError> + Clone, B: http_body::Body + Send + Unpin + 'static, B::Data: Send, B::Error: Into, { /// Build the HTTP client with target to connect to. - pub fn build(self, target: impl AsRef) -> Result, Error> { + pub fn build(self, target: impl AsRef) -> Result, Error> { let Self { max_request_size, max_response_size, @@ -254,10 +280,11 @@ where max_log_length, service_builder, tcp_no_delay, + rpc_middleware, .. } = self; - let transport = HttpTransportClientBuilder { + let http = HttpTransportClientBuilder { max_request_size, max_response_size, headers, @@ -266,6 +293,7 @@ where service_builder, #[cfg(feature = "tls")] certificate_store, + request_timeout, } .build(target) .map_err(|e| Error::Transport(e.into()))?; @@ -275,9 +303,8 @@ where .map(|max_concurrent_requests| Arc::new(Semaphore::new(max_concurrent_requests))); Ok(HttpClient { - transport, + transport: rpc_middleware.service(RpcService::new(http, max_response_size)), id_manager: Arc::new(RequestIdManager::new(id_kind)), - request_timeout, request_guard, }) } @@ -295,6 +322,7 @@ impl Default for HttpClientBuilder { max_log_length: 4096, headers: HeaderMap::new(), service_builder: tower::ServiceBuilder::new(), + rpc_middleware: RpcServiceBuilder::default(), tcp_no_delay: true, max_concurrent_requests: None, } @@ -310,11 +338,9 @@ impl HttpClientBuilder { /// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications. #[derive(Debug, Clone)] -pub struct HttpClient { +pub struct HttpClient { /// HTTP transport client. - transport: HttpTransportClient, - /// Request timeout. Defaults to 60sec. - request_timeout: Duration, + transport: S, /// Request ID manager. id_manager: Arc, /// Concurrent requests limit guard. @@ -329,13 +355,9 @@ impl HttpClient { } #[async_trait] -impl ClientT for HttpClient +impl ClientT for HttpClient where - S: Service, Error = TransportError> + Send + Sync + Clone, - >::Future: Send, - B: http_body::Body + Send + Unpin + 'static, - B::Error: Into, - B::Data: Send, + for<'a> S: RpcServiceT<'a> + Send + Sync, { #[instrument(name = "notification", skip(self, params), level = "trace")] async fn notification(&self, method: &str, params: Params) -> Result<(), Error> @@ -347,16 +369,11 @@ where None => None, }; let params = params.to_rpc_params()?; - let notif = - serde_json::to_string(&NotificationSer::borrowed(&method, params.as_deref())).map_err(Error::ParseError)?; - - let fut = self.transport.send(notif); + let mut request = Request::new(method.into(), params.as_deref(), Id::Null); + request.extensions_mut().insert(IsNotification); - match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(ok)) => Ok(ok), - Err(_) => Err(Error::RequestTimeout), - Ok(Err(e)) => Err(Error::Transport(e.into())), - } + self.transport.call(request).await; + Ok(()) } #[instrument(name = "method_call", skip(self, params), level = "trace")] @@ -372,23 +389,12 @@ where let id = self.id_manager.next_request_id(); let params = params.to_rpc_params()?; - let request = RequestSer::borrowed(&id, &method, params.as_deref()); - let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - - let fut = self.transport.send_and_read_body(raw); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => { - return Err(Error::RequestTimeout); - } - Ok(Err(e)) => { - return Err(Error::Transport(e.into())); - } - }; + let request = Request::new(method.into(), params.as_deref(), id.clone()); + let rp = self.transport.call(request).await; // NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get // a better error message if `R` couldn't be decoded. - let response = ResponseSuccess::try_from(serde_json::from_slice::>(&body)?)?; + let response = ResponseSuccess::try_from(serde_json::from_str::>(&rp.as_result())?)?; let result = serde_json::from_str(response.result.get()).map_err(Error::ParseError)?; @@ -404,7 +410,9 @@ where where R: DeserializeOwned + fmt::Debug + 'a, { - let _permit = match self.request_guard.as_ref() { + todo!(); + + /*let _permit = match self.request_guard.as_ref() { Some(permit) => permit.acquire().await.ok(), None => None, }; @@ -468,18 +476,14 @@ where } } - Ok(BatchResponse::new(successful_calls, responses, failed_calls)) + Ok(BatchResponse::new(successful_calls, responses, failed_calls))*/ } } #[async_trait] -impl SubscriptionClientT for HttpClient +impl SubscriptionClientT for HttpClient where - S: Service, Error = TransportError> + Send + Sync + Clone, - >::Future: Send, - B: http_body::Body + Send + Unpin + 'static, - B::Data: Send, - B::Error: Into, + for<'a> S: RpcServiceT<'a> + Send + Sync, { /// Send a subscription request to the server. Not implemented for HTTP; will always return /// [`Error::HttpNotImplemented`]. diff --git a/client/http-client/src/lib.rs b/client/http-client/src/lib.rs index 27138106fb..5e4e91468b 100644 --- a/client/http-client/src/lib.rs +++ b/client/http-client/src/lib.rs @@ -37,6 +37,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] mod client; +mod rpc_service; /// HTTP transport. pub mod transport; @@ -67,3 +68,6 @@ pub(crate) enum CertificateStore { Native, Custom(CustomCertStore), } + +#[derive(Copy, Clone, Debug)] +pub(crate) struct IsNotification; diff --git a/client/http-client/src/rpc_service.rs b/client/http-client/src/rpc_service.rs new file mode 100644 index 0000000000..f1fa5bb59c --- /dev/null +++ b/client/http-client/src/rpc_service.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +use futures_util::{future::BoxFuture, FutureExt}; +use hyper::body::Bytes; +use jsonrpsee_core::{ + middleware::RpcServiceT, + server::{MethodResponse, ResponsePayload}, + BoxError, JsonRawValue, +}; +use jsonrpsee_types::{Id, Response}; +use tower::Service; + +use crate::{ + transport::{Error, HttpTransportClient}, + HttpRequest, HttpResponse, IsNotification, +}; + +#[derive(Clone, Debug)] +pub struct RpcService { + service: Arc>, + max_response_size: u32, +} + +impl RpcService { + pub fn new(service: HttpTransportClient, max_response_size: u32) -> Self { + Self { service: Arc::new(service), max_response_size } + } +} + +impl<'a, B, HttpMiddleware> RpcServiceT<'a> for RpcService +where + HttpMiddleware: Service, Error = Error> + Clone + Send + Sync + 'static, + HttpMiddleware::Future: Send, + B: http_body::Body + Send + 'static, + B::Data: Send, + B::Error: Into, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, request: jsonrpsee_types::Request<'a>) -> Self::Future { + let raw = serde_json::to_string(&request).unwrap(); + let service = self.service.clone(); + let max_response_size = self.max_response_size; + + let is_notification = request.extensions().get::().is_some(); + + async move { + if is_notification { + service.send(raw).await.map_err(BoxError::from).unwrap(); + MethodResponse::response(Id::Null, ResponsePayload::success(""), max_response_size as usize) + } else { + let bytes = service.send_and_read_body(raw).await.map_err(BoxError::from).unwrap(); + let rp: Response> = serde_json::from_slice(&bytes).unwrap(); + MethodResponse::response(rp.id, rp.payload.into(), max_response_size as usize) + } + } + .boxed() + } +} diff --git a/client/http-client/src/transport.rs b/client/http-client/src/transport.rs index 56e2426bac..7660716f07 100644 --- a/client/http-client/src/transport.rs +++ b/client/http-client/src/transport.rs @@ -21,6 +21,7 @@ use jsonrpsee_core::{ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use thiserror::Error; use tower::layer::util::Identity; use tower::{Layer, Service, ServiceExt}; @@ -103,6 +104,8 @@ pub struct HttpTransportClientBuilder { pub(crate) service_builder: tower::ServiceBuilder, /// TCP_NODELAY pub(crate) tcp_no_delay: bool, + /// Request timeout + pub(crate) request_timeout: Duration, } impl Default for HttpTransportClientBuilder { @@ -123,6 +126,7 @@ impl HttpTransportClientBuilder { headers: HeaderMap::new(), service_builder: tower::ServiceBuilder::new(), tcp_no_delay: true, + request_timeout: Duration::from_secs(60), } } } @@ -182,6 +186,7 @@ impl HttpTransportClientBuilder { max_response_size: self.max_response_size, service_builder: service, tcp_no_delay: self.tcp_no_delay, + request_timeout: self.request_timeout, } } @@ -203,6 +208,7 @@ impl HttpTransportClientBuilder { headers, service_builder, tcp_no_delay, + request_timeout, } = self; let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?; @@ -288,6 +294,7 @@ impl HttpTransportClientBuilder { max_response_size, max_log_length, headers: cached_headers, + request_timeout, }) } } @@ -309,6 +316,8 @@ pub struct HttpTransportClient { max_log_length: u32, /// Custom headers to pass with every request. headers: HeaderMap, + /// Request timeout + request_timeout: Duration, } impl HttpTransportClient @@ -342,7 +351,9 @@ where pub(crate) async fn send_and_read_body(&self, body: String) -> Result, Error> { tx_log_from_str(&body, self.max_log_length); - let response = self.inner_send(body).await?; + let response = + tokio::time::timeout(self.request_timeout, self.inner_send(body)).await.map_err(|_| Error::Timeout)??; + let (parts, body) = response.into_parts(); let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?; @@ -354,7 +365,8 @@ where /// Send serialized message without reading the HTTP message body. pub(crate) async fn send(&self, body: String) -> Result<(), Error> { - let _ = self.inner_send(body).await?; + let _ = + tokio::time::timeout(self.request_timeout, self.inner_send(body)).await.map_err(|_| Error::Timeout)??; Ok(()) } @@ -385,6 +397,10 @@ pub enum Error { /// Invalid certificate store. #[error("Invalid certificate store")] InvalidCertficateStore, + + /// Timeout. + #[error("Request timed out")] + Timeout, } #[cfg(test)] diff --git a/core/Cargo.toml b/core/Cargo.toml index 1e20c7c481..82d4804dc9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,6 +31,7 @@ rustc-hash = { workspace = true, optional = true } rand = { workspace = true, optional = true } parking_lot = { workspace = true, optional = true } tokio = { workspace = true, optional = true } +tower = { workspace = true, optional = true } futures-timer = { workspace = true, optional = true } tokio-stream = { workspace = true, optional = true } pin-project = { workspace = true, optional = true } @@ -41,8 +42,8 @@ wasm-bindgen-futures = { workspace = true, optional = true } [features] default = [] http-helpers = ["bytes", "futures-util", "http-body", "http-body-util", "http"] -server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time", "http"] -client = ["futures-util/sink", "tokio/sync"] +server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time", "tower", "http"] +client = ["futures-util/sink", "tokio/sync", "tower"] async-client = [ "client", "futures-util/alloc", diff --git a/core/src/lib.rs b/core/src/lib.rs index b203f807f6..750cf10a69 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -57,6 +57,10 @@ cfg_client! { pub use client::Error as ClientError; } +cfg_client_or_server! { + pub mod middleware; +} + /// Shared tracing helpers to trace RPC calls. pub mod tracing; pub use async_trait::async_trait; diff --git a/core/src/middleware.rs b/core/src/middleware.rs new file mode 100644 index 0000000000..709d742ecc --- /dev/null +++ b/core/src/middleware.rs @@ -0,0 +1,78 @@ +//! Middleware for the RPC service. + +use futures_util::future::{Either, Future}; +use jsonrpsee_types::Request; +use tower::layer::util::{Identity, Stack}; +use tower::layer::LayerFn; + +use crate::server::MethodResponse; + +/// Similar to the [`tower::Service`] but specific for jsonrpsee and +/// doesn't requires `&mut self` for performance reasons. +pub trait RpcServiceT<'a> { + /// The future response value. + type Future: Future + Send; + + /// Process a single JSON-RPC call it may be a subscription or regular call. + /// In this interface they are treated in the same way but it's possible to + /// distinguish those based on the `MethodResponse`. + fn call(&self, request: Request<'a>) -> Self::Future; +} + +/// Similar to [`tower::ServiceBuilder`] but doesn't +/// support any tower middleware implementations. +#[derive(Debug, Clone)] +pub struct RpcServiceBuilder(tower::ServiceBuilder); + +impl Default for RpcServiceBuilder { + fn default() -> Self { + RpcServiceBuilder(tower::ServiceBuilder::new()) + } +} + +impl RpcServiceBuilder { + /// Create a new [`RpcServiceBuilder`]. + pub fn new() -> Self { + Self(tower::ServiceBuilder::new()) + } +} + +impl RpcServiceBuilder { + /// Optionally add a new layer `T` to the [`RpcServiceBuilder`]. + /// + /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details. + pub fn option_layer(self, layer: Option) -> RpcServiceBuilder, L>> { + let layer = if let Some(layer) = layer { Either::Left(layer) } else { Either::Right(Identity::new()) }; + self.layer(layer) + } + + /// Add a new layer `T` to the [`RpcServiceBuilder`]. + /// + /// See the documentation for [`tower::ServiceBuilder::layer`] for more details. + pub fn layer(self, layer: T) -> RpcServiceBuilder> { + RpcServiceBuilder(self.0.layer(layer)) + } + + /// Add a [`tower::Layer`] built from a function that accepts a service and returns another service. + /// + /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details. + pub fn layer_fn(self, f: F) -> RpcServiceBuilder, L>> { + RpcServiceBuilder(self.0.layer_fn(f)) + } + + /// Add a logging layer to [`RpcServiceBuilder`] + /// + /// This logs each request and response for every call. + /// + /*pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder> { + RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len))) + }*/ + + /// Wrap the service `S` with the middleware. + pub fn service(&self, service: S) -> L::Service + where + L: tower::Layer, + { + self.0.service(service) + } +} diff --git a/core/src/traits.rs b/core/src/traits.rs index 64e158c913..9d6afc5325 100644 --- a/core/src/traits.rs +++ b/core/src/traits.rs @@ -150,3 +150,5 @@ impl IdProvider for Box { (**self).next_id() } } + + diff --git a/examples/examples/http.rs b/examples/examples/http.rs index c65886ed21..9e7525db9c 100644 --- a/examples/examples/http.rs +++ b/examples/examples/http.rs @@ -29,13 +29,29 @@ use std::time::Duration; use hyper::body::Bytes; use jsonrpsee::core::client::ClientT; +use jsonrpsee::core::middleware::RpcServiceT; use jsonrpsee::http_client::HttpClient; use jsonrpsee::rpc_params; -use jsonrpsee::server::{RpcModule, Server}; +use jsonrpsee::server::{RpcModule, RpcServiceBuilder, Server}; +use jsonrpsee::types::Request; use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}; use tower_http::LatencyUnit; use tracing_subscriber::util::SubscriberInitExt; +struct Logger(S); + +impl<'a, S> RpcServiceT<'a> for Logger +where + S: RpcServiceT<'a>, +{ + type Future = S::Future; + + fn call(&self, req: Request<'a>) -> Self::Future { + println!("logger layer : {:?}", req); + self.0.call(req) + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let filter = tracing_subscriber::EnvFilter::try_from_default_env()? @@ -58,7 +74,9 @@ async fn main() -> anyhow::Result<()> { .on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)), ); - let client = HttpClient::builder().set_http_middleware(middleware).build(url)?; + let rpc = RpcServiceBuilder::new().layer_fn(|service| Logger(service)); + + let client = HttpClient::builder().set_http_middleware(middleware).set_rpc_middleware(rpc).build(url)?; let params = rpc_params![1_u64, 2, 3]; let response: Result = client.request("say_hello", params).await; tracing::info!("r: {:?}", response); diff --git a/server/src/middleware/rpc/mod.rs b/server/src/middleware/rpc/mod.rs index abe25ba0ed..244e926f46 100644 --- a/server/src/middleware/rpc/mod.rs +++ b/server/src/middleware/rpc/mod.rs @@ -29,80 +29,5 @@ pub mod layer; pub use layer::*; -use futures_util::Future; -use jsonrpsee_core::server::MethodResponse; -use jsonrpsee_types::Request; -use layer::either::Either; - -use tower::layer::util::{Identity, Stack}; -use tower::layer::LayerFn; - -/// Similar to the [`tower::Service`] but specific for jsonrpsee and -/// doesn't requires `&mut self` for performance reasons. -pub trait RpcServiceT<'a> { - /// The future response value. - type Future: Future + Send; - - /// Process a single JSON-RPC call it may be a subscription or regular call. - /// In this interface they are treated in the same way but it's possible to - /// distinguish those based on the `MethodResponse`. - fn call(&self, request: Request<'a>) -> Self::Future; -} - -/// Similar to [`tower::ServiceBuilder`] but doesn't -/// support any tower middleware implementations. -#[derive(Debug, Clone)] -pub struct RpcServiceBuilder(tower::ServiceBuilder); - -impl Default for RpcServiceBuilder { - fn default() -> Self { - RpcServiceBuilder(tower::ServiceBuilder::new()) - } -} - -impl RpcServiceBuilder { - /// Create a new [`RpcServiceBuilder`]. - pub fn new() -> Self { - Self(tower::ServiceBuilder::new()) - } -} - -impl RpcServiceBuilder { - /// Optionally add a new layer `T` to the [`RpcServiceBuilder`]. - /// - /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details. - pub fn option_layer(self, layer: Option) -> RpcServiceBuilder, L>> { - let layer = if let Some(layer) = layer { Either::Left(layer) } else { Either::Right(Identity::new()) }; - self.layer(layer) - } - - /// Add a new layer `T` to the [`RpcServiceBuilder`]. - /// - /// See the documentation for [`tower::ServiceBuilder::layer`] for more details. - pub fn layer(self, layer: T) -> RpcServiceBuilder> { - RpcServiceBuilder(self.0.layer(layer)) - } - - /// Add a [`tower::Layer`] built from a function that accepts a service and returns another service. - /// - /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details. - pub fn layer_fn(self, f: F) -> RpcServiceBuilder, L>> { - RpcServiceBuilder(self.0.layer_fn(f)) - } - - /// Add a logging layer to [`RpcServiceBuilder`] - /// - /// This logs each request and response for every call. - /// - pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder> { - RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len))) - } - - /// Wrap the service `S` with the middleware. - pub(crate) fn service(&self, service: S) -> L::Service - where - L: tower::Layer, - { - self.0.service(service) - } -} +pub use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; +pub use jsonrpsee_core::server::MethodResponse; From 401dfc1fecbb5ef0295ecb2b5390d6147328caf3 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Jan 2025 14:50:08 +0100 Subject: [PATCH 2/6] PoC works --- client/http-client/src/client.rs | 72 ++++++++--------------- client/http-client/src/lib.rs | 3 - client/http-client/src/rpc_service.rs | 55 +++++++++++++---- core/src/middleware.rs | 19 +++++- core/src/server/method_response.rs | 12 ++++ examples/examples/jsonrpsee_as_service.rs | 9 ++- examples/examples/ws.rs | 2 +- 7 files changed, 103 insertions(+), 69 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 3c13569408..2b24af8b64 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -24,6 +24,7 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::borrow::Cow as StdCow; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -31,18 +32,18 @@ use std::time::Duration; use crate::rpc_service::RpcService; use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClientBuilder}; use crate::types::Response; -use crate::{HttpRequest, HttpResponse, IsNotification}; +use crate::{HttpRequest, HttpResponse}; use async_trait::async_trait; use hyper::body::Bytes; use hyper::http::HeaderMap; use jsonrpsee_core::client::{ - BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, + generate_batch_id_range, BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, }; use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::params::BatchRequestBuilder; use jsonrpsee_core::traits::ToRpcParams; use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES}; -use jsonrpsee_types::{Id, InvalidRequestId, Request, ResponseSuccess}; +use jsonrpsee_types::{InvalidRequestId, Notification, Request, ResponseSuccess, TwoPointZero}; use serde::de::DeserializeOwned; use tokio::sync::Semaphore; use tower::layer::util::Identity; @@ -369,10 +370,8 @@ where None => None, }; let params = params.to_rpc_params()?; - let mut request = Request::new(method.into(), params.as_deref(), Id::Null); - request.extensions_mut().insert(IsNotification); - - self.transport.call(request).await; + let n = Notification { jsonrpc: TwoPointZero, method: method.into(), params }; + self.transport.notification(n).await; Ok(()) } @@ -410,9 +409,7 @@ where where R: DeserializeOwned + fmt::Debug + 'a, { - todo!(); - - /*let _permit = match self.request_guard.as_ref() { + let _permit = match self.request_guard.as_ref() { Some(permit) => permit.acquire().await.ok(), None => None, }; @@ -423,60 +420,37 @@ where let mut batch_request = Vec::with_capacity(batch.len()); for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { let id = self.id_manager.as_id_kind().into_id(id); - batch_request.push(RequestSer { + batch_request.push(Request { jsonrpc: TwoPointZero, - id, method: method.into(), params: params.map(StdCow::Owned), + id, + extensions: Default::default(), }); } - let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); - - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => return Err(Error::RequestTimeout), - Ok(Err(e)) => return Err(Error::Transport(e.into())), - }; + let batch = self.transport.batch(batch_request).await; + let responses: Vec> = serde_json::from_str(&batch.as_result()).unwrap(); - let json_rps: Vec> = serde_json::from_slice(&body).map_err(Error::ParseError)?; + let mut x = Vec::new(); + let mut success = 0; + let mut failed = 0; - let mut responses = Vec::with_capacity(json_rps.len()); - let mut successful_calls = 0; - let mut failed_calls = 0; - - for _ in 0..json_rps.len() { - responses.push(Err(ErrorObject::borrowed(0, "", None))); - } - - for rp in json_rps { - let id = rp.id.try_parse_inner_as_number()?; - - let res = match ResponseSuccess::try_from(rp) { + for rp in responses.into_iter() { + match ResponseSuccess::try_from(rp) { Ok(r) => { - let result = serde_json::from_str(r.result.get())?; - successful_calls += 1; - Ok(result) + let v = serde_json::from_str(r.result.get()).map_err(Error::ParseError)?; + x.push(Ok(v)); + success += 1; } Err(err) => { - failed_calls += 1; - Err(err) + x.push(Err(err)); + failed += 1; } }; - - let maybe_elem = id - .checked_sub(id_range.start) - .and_then(|p| p.try_into().ok()) - .and_then(|p: usize| responses.get_mut(p)); - - if let Some(elem) = maybe_elem { - *elem = res; - } else { - return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into()); - } } - Ok(BatchResponse::new(successful_calls, responses, failed_calls))*/ + Ok(BatchResponse::new(success, x, failed)) } } diff --git a/client/http-client/src/lib.rs b/client/http-client/src/lib.rs index 5e4e91468b..ade090a32c 100644 --- a/client/http-client/src/lib.rs +++ b/client/http-client/src/lib.rs @@ -68,6 +68,3 @@ pub(crate) enum CertificateStore { Native, Custom(CustomCertStore), } - -#[derive(Copy, Clone, Debug)] -pub(crate) struct IsNotification; diff --git a/client/http-client/src/rpc_service.rs b/client/http-client/src/rpc_service.rs index f1fa5bb59c..084c5672d6 100644 --- a/client/http-client/src/rpc_service.rs +++ b/client/http-client/src/rpc_service.rs @@ -4,15 +4,15 @@ use futures_util::{future::BoxFuture, FutureExt}; use hyper::body::Bytes; use jsonrpsee_core::{ middleware::RpcServiceT, - server::{MethodResponse, ResponsePayload}, + server::{BatchResponseBuilder, MethodResponse, ResponsePayload}, BoxError, JsonRawValue, }; -use jsonrpsee_types::{Id, Response}; +use jsonrpsee_types::{Id, Response, ResponseSuccess}; use tower::Service; use crate::{ transport::{Error, HttpTransportClient}, - HttpRequest, HttpResponse, IsNotification, + HttpRequest, HttpResponse, }; #[derive(Clone, Debug)] @@ -42,17 +42,50 @@ where let service = self.service.clone(); let max_response_size = self.max_response_size; - let is_notification = request.extensions().get::().is_some(); + async move { + let bytes = service.send_and_read_body(raw).await.map_err(BoxError::from).unwrap(); + let rp: Response> = serde_json::from_slice(&bytes).unwrap(); + MethodResponse::response(rp.id, rp.payload.into(), max_response_size as usize) + } + .boxed() + } + + fn batch(&self, requests: Vec>) -> Self::Future { + let raw = serde_json::to_string(&requests).unwrap(); + let service = self.service.clone(); + let max_response_size = self.max_response_size; async move { - if is_notification { - service.send(raw).await.map_err(BoxError::from).unwrap(); - MethodResponse::response(Id::Null, ResponsePayload::success(""), max_response_size as usize) - } else { - let bytes = service.send_and_read_body(raw).await.map_err(BoxError::from).unwrap(); - let rp: Response> = serde_json::from_slice(&bytes).unwrap(); - MethodResponse::response(rp.id, rp.payload.into(), max_response_size as usize) + let bytes = service.send_and_read_body(raw).await.map_err(BoxError::from).unwrap(); + let json_rps: Vec> = serde_json::from_slice(&bytes).unwrap(); + let mut batch = BatchResponseBuilder::new_with_limit(max_response_size as usize); + + for rp in json_rps { + let id = rp.id.try_parse_inner_as_number().unwrap(); + + let response = match ResponseSuccess::try_from(rp) { + Ok(r) => { + let payload = ResponsePayload::success(r.result); + MethodResponse::response(r.id, payload, max_response_size as usize) + } + Err(err) => MethodResponse::error(Id::Number(id), err), + }; + + batch.append(&response).unwrap(); } + + MethodResponse::from_batch(batch.finish()) + } + .boxed() + } + + fn notification(&self, notif: jsonrpsee_types::Notification<'a, Option>>) -> Self::Future { + let raw = serde_json::to_string(¬if).unwrap(); + let service = self.service.clone(); + + async move { + service.send(raw).await.map_err(BoxError::from).unwrap(); + MethodResponse::notification() } .boxed() } diff --git a/core/src/middleware.rs b/core/src/middleware.rs index 709d742ecc..1d9b5e9599 100644 --- a/core/src/middleware.rs +++ b/core/src/middleware.rs @@ -1,7 +1,8 @@ //! Middleware for the RPC service. use futures_util::future::{Either, Future}; -use jsonrpsee_types::Request; +use jsonrpsee_types::{Notification, Request}; +use serde_json::value::RawValue; use tower::layer::util::{Identity, Stack}; use tower::layer::LayerFn; @@ -14,9 +15,23 @@ pub trait RpcServiceT<'a> { type Future: Future + Send; /// Process a single JSON-RPC call it may be a subscription or regular call. - /// In this interface they are treated in the same way but it's possible to + /// + /// In this interface both are treated in the same way but it's possible to /// distinguish those based on the `MethodResponse`. fn call(&self, request: Request<'a>) -> Self::Future; + + /// Similar to `RpcServiceT::call` but process multiple JSON-RPC calls at once. + /// + /// This method is optional because it's generally not by the server however + /// it may be useful for batch processing on the client side. + fn batch(&self, _requests: Vec>) -> Self::Future { + todo!(); + } + + /// Similar to `RpcServiceT::call` but process a JSON-RPC notification. + fn notification(&self, _request: Notification<'a, Option>>) -> Self::Future { + todo!(); + } } /// Similar to [`tower::ServiceBuilder`] but doesn't diff --git a/core/src/server/method_response.rs b/core/src/server/method_response.rs index 30b3e84ea8..c84526a389 100644 --- a/core/src/server/method_response.rs +++ b/core/src/server/method_response.rs @@ -41,6 +41,7 @@ enum ResponseKind { MethodCall, Subscription, Batch, + Notification, } /// Represents a response to a method call. @@ -226,6 +227,17 @@ impl MethodResponse { } } + /// Create notification response which is a response that doesn't expect a reply. + pub fn notification() -> Self { + Self { + result: String::new(), + success_or_error: MethodResponseResult::Success, + kind: ResponseKind::Notification, + on_close: None, + extensions: Extensions::new(), + } + } + /// Returns a reference to the associated extensions. pub fn extensions(&self) -> &Extensions { &self.extensions diff --git a/examples/examples/jsonrpsee_as_service.rs b/examples/examples/jsonrpsee_as_service.rs index cb8c303144..a2f9641292 100644 --- a/examples/examples/jsonrpsee_as_service.rs +++ b/examples/examples/jsonrpsee_as_service.rs @@ -209,9 +209,12 @@ async fn run_server(metrics: Metrics) -> anyhow::Result { // NOTE, the rpc middleware must be initialized here to be able to created once per connection // with data from the connection such as the headers in this example let headers = req.headers().clone(); - let rpc_middleware = RpcServiceBuilder::new().rpc_logger(1024).layer_fn(move |service| { - AuthorizationMiddleware { inner: service, headers: headers.clone(), transport_label } - }); + let rpc_middleware = RpcServiceBuilder::new() /* .rpc_logger(1024)*/ + .layer_fn(move |service| AuthorizationMiddleware { + inner: service, + headers: headers.clone(), + transport_label, + }); let mut svc = svc_builder .set_http_middleware(http_middleware) diff --git a/examples/examples/ws.rs b/examples/examples/ws.rs index 6587585cc3..c84762d3b2 100644 --- a/examples/examples/ws.rs +++ b/examples/examples/ws.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - let rpc_middleware = RpcServiceBuilder::new().rpc_logger(1024); + let rpc_middleware = RpcServiceBuilder::new()/* .rpc_logger(1024)*/; let server = Server::builder().set_rpc_middleware(rpc_middleware).build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _, _| "lo")?; From 11d7b4dda66d469563d30458da56fefafef88550 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 29 Jan 2025 15:11:30 +0100 Subject: [PATCH 3/6] cargo fmt --- core/src/traits.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/traits.rs b/core/src/traits.rs index 9d6afc5325..64e158c913 100644 --- a/core/src/traits.rs +++ b/core/src/traits.rs @@ -150,5 +150,3 @@ impl IdProvider for Box { (**self).next_id() } } - - From 622bffd974446dd5afe58c6c49511817b148ad54 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 25 Feb 2025 19:06:14 +0100 Subject: [PATCH 4/6] more refactoring --- .../src/middleware}/layer/either.rs | 19 +++++- .../src/middleware}/layer/logger.rs | 23 +++++-- .../rpc => core/src/middleware/layer}/mod.rs | 9 ++- core/src/{middleware.rs => middleware/mod.rs} | 50 +++++++++++--- core/src/server/method_response.rs | 9 ++- examples/examples/http.rs | 12 +++- examples/examples/jsonrpsee_as_service.rs | 10 ++- .../jsonrpsee_server_low_level_api.rs | 10 ++- examples/examples/rpc_middleware.rs | 26 +++++++- .../examples/rpc_middleware_modify_request.rs | 10 ++- .../examples/rpc_middleware_rate_limiting.rs | 10 ++- .../server_with_connection_details.rs | 12 +++- server/src/lib.rs | 2 +- server/src/middleware/mod.rs | 1 - .../{rpc/layer/rpc_service.rs => rpc.rs} | 27 ++++++-- server/src/middleware/rpc/layer/mod.rs | 66 ------------------- server/src/server.rs | 55 ++++++---------- server/src/transport/http.rs | 13 ++-- server/src/transport/ws.rs | 43 +++++------- tests/tests/helpers.rs | 11 +++- tests/tests/integration_tests.rs | 6 +- tests/tests/metrics.rs | 10 ++- 22 files changed, 258 insertions(+), 176 deletions(-) rename {server/src/middleware/rpc => core/src/middleware}/layer/either.rs (79%) rename {server/src/middleware/rpc => core/src/middleware}/layer/logger.rs (81%) rename {server/src/middleware/rpc => core/src/middleware/layer}/mod.rs (83%) rename core/src/{middleware.rs => middleware/mod.rs} (68%) rename server/src/middleware/{rpc/layer/rpc_service.rs => rpc.rs} (86%) delete mode 100644 server/src/middleware/rpc/layer/mod.rs diff --git a/server/src/middleware/rpc/layer/either.rs b/core/src/middleware/layer/either.rs similarity index 79% rename from server/src/middleware/rpc/layer/either.rs rename to core/src/middleware/layer/either.rs index 01209323c3..b64464d974 100644 --- a/server/src/middleware/rpc/layer/either.rs +++ b/core/src/middleware/layer/either.rs @@ -31,7 +31,7 @@ //! work to implement tower::Layer for //! external types such as future::Either. -use crate::middleware::rpc::RpcServiceT; +use crate::middleware::RpcServiceT; use jsonrpsee_types::Request; /// [`tower::util::Either`] but @@ -72,4 +72,21 @@ where Either::Right(service) => futures_util::future::Either::Right(service.call(request)), } } + + fn batch(&self, requests: Vec>) -> Self::Future { + match self { + Either::Left(service) => futures_util::future::Either::Left(service.batch(requests)), + Either::Right(service) => futures_util::future::Either::Right(service.batch(requests)), + } + } + + fn notification( + &self, + request: jsonrpsee_types::Notification<'a, Option>>, + ) -> Self::Future { + match self { + Either::Left(service) => futures_util::future::Either::Left(service.notification(request)), + Either::Right(service) => futures_util::future::Either::Right(service.notification(request)), + } + } } diff --git a/server/src/middleware/rpc/layer/logger.rs b/core/src/middleware/layer/logger.rs similarity index 81% rename from server/src/middleware/rpc/layer/logger.rs rename to core/src/middleware/layer/logger.rs index 9849bce057..366ff622ce 100644 --- a/server/src/middleware/rpc/layer/logger.rs +++ b/core/src/middleware/layer/logger.rs @@ -31,17 +31,16 @@ use std::{ task::{Context, Poll}, }; -use futures_util::Future; -use jsonrpsee_core::{ - server::MethodResponse, +use crate::{ + middleware::{MethodResponse, Notification, RpcServiceT}, tracing::server::{rx_log_from_json, tx_log_from_str}, }; + +use futures_util::Future; use jsonrpsee_types::Request; use pin_project::pin_project; use tracing::{instrument::Instrumented, Instrument}; -use crate::middleware::rpc::RpcServiceT; - /// RPC logger layer. #[derive(Copy, Clone, Debug)] pub struct RpcLoggerLayer(u32); @@ -80,6 +79,20 @@ where ResponseFuture { fut: self.service.call(request), max: self.max }.in_current_span() } + + #[tracing::instrument(name = "batch", skip_all, fields(method = "batch"), level = "trace")] + fn batch(&self, requests: Vec>) -> Self::Future { + rx_log_from_json(&requests, self.max); + + ResponseFuture { fut: self.service.batch(requests), max: self.max }.in_current_span() + } + + #[tracing::instrument(name = "notification", skip_all, fields(method = &*n.method), level = "trace")] + fn notification(&self, n: Notification<'a>) -> Self::Future { + rx_log_from_json(&n, self.max); + + ResponseFuture { fut: self.service.notification(n), max: self.max }.in_current_span() + } } /// Response future to log the response for a method call. diff --git a/server/src/middleware/rpc/mod.rs b/core/src/middleware/layer/mod.rs similarity index 83% rename from server/src/middleware/rpc/mod.rs rename to core/src/middleware/layer/mod.rs index 244e926f46..e31c352ec9 100644 --- a/server/src/middleware/rpc/mod.rs +++ b/core/src/middleware/layer/mod.rs @@ -24,10 +24,9 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Various middleware implementations for JSON-RPC specific purposes. +//! Specific middleware layer implementation provided by jsonrpsee. -pub mod layer; -pub use layer::*; +pub mod either; +pub mod logger; -pub use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; -pub use jsonrpsee_core::server::MethodResponse; +pub use logger::*; diff --git a/core/src/middleware.rs b/core/src/middleware/mod.rs similarity index 68% rename from core/src/middleware.rs rename to core/src/middleware/mod.rs index 1d9b5e9599..58cffed70e 100644 --- a/core/src/middleware.rs +++ b/core/src/middleware/mod.rs @@ -1,13 +1,22 @@ //! Middleware for the RPC service. +pub mod layer; + use futures_util::future::{Either, Future}; -use jsonrpsee_types::{Notification, Request}; +use jsonrpsee_types::Request; +use pin_project::pin_project; use serde_json::value::RawValue; use tower::layer::util::{Identity, Stack}; use tower::layer::LayerFn; +use std::pin::Pin; +use std::task::{Context, Poll}; + use crate::server::MethodResponse; +/// JSON-RPC notification. +pub type Notification<'a> = jsonrpsee_types::Notification<'a, Option>>; + /// Similar to the [`tower::Service`] but specific for jsonrpsee and /// doesn't requires `&mut self` for performance reasons. pub trait RpcServiceT<'a> { @@ -24,14 +33,10 @@ pub trait RpcServiceT<'a> { /// /// This method is optional because it's generally not by the server however /// it may be useful for batch processing on the client side. - fn batch(&self, _requests: Vec>) -> Self::Future { - todo!(); - } + fn batch(&self, requests: Vec>) -> Self::Future; /// Similar to `RpcServiceT::call` but process a JSON-RPC notification. - fn notification(&self, _request: Notification<'a, Option>>) -> Self::Future { - todo!(); - } + fn notification(&self, n: Notification<'a>) -> Self::Future; } /// Similar to [`tower::ServiceBuilder`] but doesn't @@ -79,9 +84,9 @@ impl RpcServiceBuilder { /// /// This logs each request and response for every call. /// - /*pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder> { - RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len))) - }*/ + pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder> { + RpcServiceBuilder(self.0.layer(layer::RpcLoggerLayer::new(max_log_len))) + } /// Wrap the service `S` with the middleware. pub fn service(&self, service: S) -> L::Service @@ -91,3 +96,28 @@ impl RpcServiceBuilder { self.0.service(service) } } + +/// Response which may be ready or a future. +#[derive(Debug)] +#[pin_project] +pub struct ResponseFuture(#[pin] futures_util::future::Either>); + +impl ResponseFuture { + /// Returns a future that resolves to a response. + pub fn future(f: F) -> ResponseFuture { + ResponseFuture(Either::Left(f)) + } + + /// Return a response which is already computed. + pub fn ready(response: MethodResponse) -> ResponseFuture { + ResponseFuture(Either::Right(std::future::ready(response))) + } +} + +impl> Future for ResponseFuture { + type Output = MethodResponse; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().0.poll(cx) + } +} diff --git a/core/src/server/method_response.rs b/core/src/server/method_response.rs index c84526a389..f505c17272 100644 --- a/core/src/server/method_response.rs +++ b/core/src/server/method_response.rs @@ -86,6 +86,11 @@ impl MethodResponse { matches!(self.kind, ResponseKind::MethodCall) } + /// Returns whether the response is a notification response. + pub fn is_notification(&self) -> bool { + matches!(self.kind, ResponseKind::Notification) + } + /// Returns whether the response is a batch response. pub fn is_batch(&self) -> bool { matches!(self.kind, ResponseKind::Batch) @@ -443,11 +448,11 @@ pub struct MethodResponseFuture(tokio::sync::oneshot::Receiver); /// was succesful or not. #[derive(Debug, Copy, Clone)] pub enum NotifyMsg { - /// The response was succesfully processed. + /// The response was successfully processed. Ok, /// The response was the wrong kind /// such an error response when - /// one expected a succesful response. + /// one expected a successful response. Err, } diff --git a/examples/examples/http.rs b/examples/examples/http.rs index 9e7525db9c..af17815128 100644 --- a/examples/examples/http.rs +++ b/examples/examples/http.rs @@ -29,7 +29,7 @@ use std::time::Duration; use hyper::body::Bytes; use jsonrpsee::core::client::ClientT; -use jsonrpsee::core::middleware::RpcServiceT; +use jsonrpsee::core::middleware::{Notification, RpcServiceT}; use jsonrpsee::http_client::HttpClient; use jsonrpsee::rpc_params; use jsonrpsee::server::{RpcModule, RpcServiceBuilder, Server}; @@ -50,6 +50,16 @@ where println!("logger layer : {:?}", req); self.0.call(req) } + + fn batch(&self, reqs: Vec>) -> Self::Future { + println!("logger layer : {:?}", reqs); + self.0.batch(reqs) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + println!("logger layer : {:?}", n); + self.0.notification(n) + } } #[tokio::main] diff --git a/examples/examples/jsonrpsee_as_service.rs b/examples/examples/jsonrpsee_as_service.rs index a2f9641292..3daa87fb87 100644 --- a/examples/examples/jsonrpsee_as_service.rs +++ b/examples/examples/jsonrpsee_as_service.rs @@ -39,9 +39,9 @@ use futures::FutureExt; use hyper::header::AUTHORIZATION; use hyper::HeaderMap; use jsonrpsee::core::async_trait; +use jsonrpsee::core::middleware::{Notification, ResponseFuture, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::http_client::HttpClient; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::middleware::rpc::{ResponseFuture, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::{ serve_with_graceful_shutdown, stop_channel, ServerConfig, ServerHandle, StopHandle, TowerServiceBuilder, }; @@ -91,6 +91,14 @@ where ResponseFuture::future(self.inner.call(req)) } } + + fn batch(&self, reqs: Vec>) -> Self::Future { + ResponseFuture::future(self.inner.batch(reqs)) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + ResponseFuture::future(self.inner.notification(n)) + } } #[rpc(server, client)] diff --git a/examples/examples/jsonrpsee_server_low_level_api.rs b/examples/examples/jsonrpsee_server_low_level_api.rs index e7a458d58b..fc0f88d25c 100644 --- a/examples/examples/jsonrpsee_server_low_level_api.rs +++ b/examples/examples/jsonrpsee_server_low_level_api.rs @@ -47,9 +47,9 @@ use std::sync::{Arc, Mutex}; use futures::future::BoxFuture; use futures::FutureExt; use jsonrpsee::core::async_trait; +use jsonrpsee::core::middleware::{Notification, RpcServiceT}; use jsonrpsee::http_client::HttpClient; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::server::{ http, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder, ServerConfig, ServerHandle, StopHandle, @@ -98,6 +98,14 @@ where } .boxed() } + + fn batch(&self, reqs: Vec>) -> Self::Future { + Box::pin(self.service.batch(reqs)) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + Box::pin(self.service.notification(n)) + } } #[rpc(server, client)] diff --git a/examples/examples/rpc_middleware.rs b/examples/examples/rpc_middleware.rs index 4783257014..f4bc071534 100644 --- a/examples/examples/rpc_middleware.rs +++ b/examples/examples/rpc_middleware.rs @@ -44,8 +44,8 @@ use std::sync::Arc; use futures::future::BoxFuture; use futures::FutureExt; use jsonrpsee::core::client::ClientT; +use jsonrpsee::core::middleware::{Notification, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::rpc_params; -use jsonrpsee::server::middleware::rpc::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::{MethodResponse, RpcModule, Server}; use jsonrpsee::types::Request; use jsonrpsee::ws_client::WsClientBuilder; @@ -77,6 +77,14 @@ where } .boxed() } + + fn batch(&self, reqs: Vec>) -> Self::Future { + Box::pin(self.service.batch(reqs)) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + Box::pin(self.service.notification(n)) + } } #[derive(Clone)] @@ -104,6 +112,14 @@ where } .boxed() } + + fn batch(&self, reqs: Vec>) -> Self::Future { + Box::pin(self.service.batch(reqs)) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + Box::pin(self.service.notification(n)) + } } #[derive(Clone)] @@ -119,6 +135,14 @@ where println!("logger middleware: method `{}`", req.method); self.0.call(req) } + + fn batch(&self, reqs: Vec>) -> Self::Future { + self.0.batch(reqs) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + self.0.notification(n) + } } #[tokio::main] diff --git a/examples/examples/rpc_middleware_modify_request.rs b/examples/examples/rpc_middleware_modify_request.rs index f0f75e3584..548375c09f 100644 --- a/examples/examples/rpc_middleware_modify_request.rs +++ b/examples/examples/rpc_middleware_modify_request.rs @@ -25,7 +25,7 @@ // DEALINGS IN THE SOFTWARE. use jsonrpsee::core::client::ClientT; -use jsonrpsee::server::middleware::rpc::{RpcServiceBuilder, RpcServiceT}; +use jsonrpsee::core::middleware::{Notification, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::Server; use jsonrpsee::types::Request; use jsonrpsee::ws_client::WsClientBuilder; @@ -57,6 +57,14 @@ where self.0.call(req) } + + fn batch(&self, _reqs: Vec>) -> Self::Future { + todo!(); + } + + fn notification(&self, _n: Notification<'a>) -> Self::Future { + todo!(); + } } #[tokio::main] diff --git a/examples/examples/rpc_middleware_rate_limiting.rs b/examples/examples/rpc_middleware_rate_limiting.rs index cf058a4a66..ee08a0a019 100644 --- a/examples/examples/rpc_middleware_rate_limiting.rs +++ b/examples/examples/rpc_middleware_rate_limiting.rs @@ -32,7 +32,7 @@ //! such as `Arc` use jsonrpsee::core::client::ClientT; -use jsonrpsee::server::middleware::rpc::{ResponseFuture, RpcServiceBuilder, RpcServiceT}; +use jsonrpsee::core::middleware::{Notification, ResponseFuture, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::Server; use jsonrpsee::types::{ErrorObject, Request}; use jsonrpsee::ws_client::WsClientBuilder; @@ -126,6 +126,14 @@ where ResponseFuture::future(self.service.call(req)) } } + + fn batch(&self, requests: Vec>) -> Self::Future { + ResponseFuture::future(self.service.batch(requests)) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + ResponseFuture::future(self.service.notification(n)) + } } #[tokio::main] diff --git a/examples/examples/server_with_connection_details.rs b/examples/examples/server_with_connection_details.rs index 3e339a5b4c..bde27faf2d 100644 --- a/examples/examples/server_with_connection_details.rs +++ b/examples/examples/server_with_connection_details.rs @@ -27,9 +27,9 @@ use std::net::SocketAddr; use jsonrpsee::core::async_trait; +use jsonrpsee::core::middleware::{Notification, RpcServiceT}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::server::{PendingSubscriptionSink, SubscriptionMessage}; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use jsonrpsee::ws_client::WsClientBuilder; @@ -60,6 +60,14 @@ impl<'a, S: RpcServiceT<'a>> RpcServiceT<'a> for LoggingMiddleware { self.0.call(request) } + + fn batch(&self, _requests: Vec>) -> Self::Future { + todo!(); + } + + fn notification(&self, _n: Notification<'a>) -> Self::Future { + todo!(); + } } pub struct RpcServerImpl; @@ -127,7 +135,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new().layer_fn(LoggingMiddleware); + let rpc_middleware = jsonrpsee::server::RpcServiceBuilder::new().layer_fn(LoggingMiddleware); let server = jsonrpsee::server::Server::builder().set_rpc_middleware(rpc_middleware).build("127.0.0.1:0").await?; let addr = server.local_addr()?; diff --git a/server/src/lib.rs b/server/src/lib.rs index d949628b40..3dee440e63 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -44,10 +44,10 @@ mod tests; pub use future::{stop_channel, AlreadyStoppedError, ConnectionGuard, ConnectionPermit, ServerHandle, StopHandle}; pub use jsonrpsee_core::error::RegisterMethodError; +pub use jsonrpsee_core::middleware::RpcServiceBuilder; pub use jsonrpsee_core::server::*; pub use jsonrpsee_core::{id_providers::*, traits::IdProvider}; pub use jsonrpsee_types as types; -pub use middleware::rpc::RpcServiceBuilder; pub use server::{ BatchRequestConfig, Builder as ServerBuilder, ConnectionState, PingConfig, Server, ServerConfig, ServerConfigBuilder, TowerService, TowerServiceBuilder, diff --git a/server/src/middleware/mod.rs b/server/src/middleware/mod.rs index dff6948aa3..93b66a7b29 100644 --- a/server/src/middleware/mod.rs +++ b/server/src/middleware/mod.rs @@ -28,5 +28,4 @@ /// HTTP related middleware. pub mod http; -/// JSON-RPC specific middleware. pub mod rpc; diff --git a/server/src/middleware/rpc/layer/rpc_service.rs b/server/src/middleware/rpc.rs similarity index 86% rename from server/src/middleware/rpc/layer/rpc_service.rs rename to server/src/middleware/rpc.rs index 24d049477c..4a65e454b9 100644 --- a/server/src/middleware/rpc/layer/rpc_service.rs +++ b/server/src/middleware/rpc.rs @@ -26,14 +26,13 @@ //! JSON-RPC service middleware. -use super::ResponseFuture; use std::sync::Arc; -use crate::middleware::rpc::RpcServiceT; use crate::ConnectionId; -use futures_util::future::BoxFuture; +use futures_util::future::{BoxFuture, FutureExt}; +use jsonrpsee_core::middleware::{Notification, ResponseFuture, RpcServiceT}; use jsonrpsee_core::server::{ - BoundedSubscriptions, MethodCallback, MethodResponse, MethodSink, Methods, SubscriptionState, + BatchResponseBuilder, BoundedSubscriptions, MethodCallback, MethodResponse, MethodSink, Methods, SubscriptionState, }; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_types::error::{reject_too_many_subscriptions, ErrorCode}; @@ -147,4 +146,24 @@ impl<'a> RpcServiceT<'a> for RpcService { }, } } + + fn batch(&self, reqs: Vec>) -> Self::Future { + let mut batch = BatchResponseBuilder::new_with_limit(self.max_response_body_size); + let service = self.clone(); + let fut = async move { + for req in reqs { + let rp = service.call(req).await; + if let Err(err) = batch.append(&rp) { + return err; + } + } + MethodResponse::from_batch(batch.finish()) + } + .boxed(); + ResponseFuture::future(fut) + } + + fn notification(&self, _: Notification<'a>) -> Self::Future { + ResponseFuture::ready(MethodResponse::notification()) + } } diff --git a/server/src/middleware/rpc/layer/mod.rs b/server/src/middleware/rpc/layer/mod.rs deleted file mode 100644 index 16a37c8852..0000000000 --- a/server/src/middleware/rpc/layer/mod.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Specific middleware layer implementation provided by jsonrpsee. - -pub mod either; -pub mod logger; -pub mod rpc_service; - -pub use logger::*; -pub use rpc_service::*; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures_util::future::{Either, Future}; -use jsonrpsee_core::server::MethodResponse; -use pin_project::pin_project; - -/// Response which may be ready or a future. -#[derive(Debug)] -#[pin_project] -pub struct ResponseFuture(#[pin] futures_util::future::Either>); - -impl ResponseFuture { - /// Returns a future that resolves to a response. - pub fn future(f: F) -> ResponseFuture { - ResponseFuture(Either::Left(f)) - } - - /// Return a response which is already computed. - pub fn ready(response: MethodResponse) -> ResponseFuture { - ResponseFuture(Either::Right(std::future::ready(response))) - } -} - -impl> Future for ResponseFuture { - type Output = MethodResponse; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().0.poll(cx) - } -} diff --git a/server/src/server.rs b/server/src/server.rs index e06e837fd3..db1095a376 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -34,7 +34,7 @@ use std::task::Poll; use std::time::Duration; use crate::future::{session_close, ConnectionGuard, ServerHandle, SessionClose, SessionClosedFuture, StopHandle}; -use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; +use crate::middleware::rpc::{RpcService, RpcServiceCfg}; use crate::transport::ws::BackgroundTaskParams; use crate::transport::{http, ws}; use crate::utils::deserialize; @@ -46,10 +46,9 @@ use futures_util::io::{BufReader, BufWriter}; use hyper::body::Bytes; use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee_core::id_providers::RandomIntegerIdProvider; +use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::server::helpers::prepare_error; -use jsonrpsee_core::server::{ - BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods, -}; +use jsonrpsee_core::server::{BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES}; @@ -65,7 +64,7 @@ use tower::layer::util::Identity; use tower::{Layer, Service}; use tracing::{instrument, Instrument}; -type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>; +type Notif<'a> = Notification<'a, Option>>; /// Default maximum connections allowed. const MAX_CONNECTIONS: u32 = 100; @@ -1102,9 +1101,7 @@ where )); Box::pin(async move { - let rp = - http::call_with_service(request, batch_config, max_request_size, rpc_service, max_response_size) - .await; + let rp = http::call_with_service(request, batch_config, max_request_size, rpc_service).await; // NOTE: The `conn guard` must be held until the response is processed // to respect the `max_connections` limit. drop(conn); @@ -1230,22 +1227,21 @@ pub(crate) async fn handle_rpc_call( body: &[u8], is_single: bool, batch_config: BatchRequestConfig, - max_response_size: u32, rpc_service: &S, extensions: Extensions, -) -> Option +) -> MethodResponse where for<'a> S: RpcServiceT<'a> + Send, { // Single request or notification if is_single { if let Ok(req) = deserialize::from_slice_with_extensions(body, extensions) { - Some(rpc_service.call(req).await) - } else if let Ok(_notif) = serde_json::from_slice::(body) { - None + rpc_service.call(req).await + } else if let Ok(notif) = serde_json::from_slice::(body) { + rpc_service.notification(notif).await } else { let (id, code) = prepare_error(body); - Some(MethodResponse::error(id, ErrorObject::from(code))) + MethodResponse::error(id, ErrorObject::from(code)) } } // Batch of requests. @@ -1256,7 +1252,7 @@ where Id::Null, ErrorObject::borrowed(BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, None), ); - return Some(rp); + return rp; } BatchRequestConfig::Limit(limit) => limit as usize, BatchRequestConfig::Unlimited => usize::MAX, @@ -1264,45 +1260,36 @@ where if let Ok(batch) = serde_json::from_slice::>(body) { if batch.len() > max_len { - return Some(MethodResponse::error(Id::Null, reject_too_big_batch_request(max_len))); + return MethodResponse::error(Id::Null, reject_too_big_batch_request(max_len)); } + let mut b = Vec::with_capacity(batch.len()); let mut got_notif = false; - let mut batch_response = BatchResponseBuilder::new_with_limit(max_response_size as usize); for call in batch { if let Ok(req) = deserialize::from_str_with_extensions(call.get(), extensions.clone()) { - let rp = rpc_service.call(req).await; - - if let Err(too_large) = batch_response.append(&rp) { - return Some(too_large); - } + b.push(req); } else if let Ok(_notif) = serde_json::from_str::(call.get()) { - // notifications should not be answered. got_notif = true; } else { - // valid JSON but could be not parsable as `InvalidRequest` let id = match serde_json::from_str::(call.get()) { Ok(err) => err.id, Err(_) => Id::Null, }; - if let Err(too_large) = - batch_response.append(&MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))) - { - return Some(too_large); - } + return MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest)); } } - if got_notif && batch_response.is_empty() { - None + let batch_response = rpc_service.batch(b).await; + + if got_notif && batch_response.as_result().len() == 0 { + MethodResponse::notification() } else { - let batch_rp = batch_response.finish(); - Some(MethodResponse::from_batch(batch_rp)) + batch_response } } else { - Some(MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))) + MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError)) } } } diff --git a/server/src/transport/http.rs b/server/src/transport/http.rs index 1d0b7d5cd7..f45f4bc30e 100644 --- a/server/src/transport/http.rs +++ b/server/src/transport/http.rs @@ -1,5 +1,5 @@ use crate::{ - middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}, + middleware::rpc::{RpcService, RpcServiceCfg}, server::{handle_rpc_call, ServerConfig}, BatchRequestConfig, ConnectionState, HttpRequest, HttpResponse, LOG_TARGET, }; @@ -7,6 +7,7 @@ use http::Method; use hyper::body::{Body, Bytes}; use jsonrpsee_core::{ http_helpers::{read_body, HttpError}, + middleware::{RpcServiceBuilder, RpcServiceT}, server::Methods, BoxError, }; @@ -55,9 +56,7 @@ where RpcServiceCfg::OnlyCalls, )); - let rp = - call_with_service(request, batch_requests_config, max_request_body_size, rpc_service, max_response_body_size) - .await; + let rp = call_with_service(request, batch_requests_config, max_request_body_size, rpc_service).await; drop(conn); @@ -72,7 +71,6 @@ pub async fn call_with_service( batch_config: BatchRequestConfig, max_request_size: u32, rpc_service: S, - max_response_size: u32, ) -> HttpResponse where B: http_body::Body + Send + 'static, @@ -95,12 +93,11 @@ where } }; - let rp = handle_rpc_call(&body, is_single, batch_config, max_response_size, &rpc_service, parts.extensions) - .await; + let rp = handle_rpc_call(&body, is_single, batch_config, &rpc_service, parts.extensions).await; // If the response is empty it means that it was a notification or empty batch. // For HTTP these are just ACK:ed with a empty body. - response::ok_response(rp.map_or(String::new(), |r| r.into_result())) + response::ok_response(rp.into_result()) } // Error scenarios: Method::POST => response::unsupported_content_type(), diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index 8bccb14e25..2c3bf1abad 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Instant; use crate::future::{IntervalStream, SessionClose}; -use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; +use crate::middleware::rpc::{RpcService, RpcServiceCfg}; use crate::server::{handle_rpc_call, ConnectionState, ServerConfig}; use crate::{HttpBody, HttpRequest, HttpResponse, PingConfig, LOG_TARGET}; @@ -11,6 +11,7 @@ use futures_util::io::{BufReader, BufWriter}; use futures_util::{Future, StreamExt, TryStreamExt}; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; +use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::server::{BoundedSubscriptions, MethodSink, Methods}; use jsonrpsee_types::error::{reject_too_big_request, ErrorCode}; use jsonrpsee_types::Id; @@ -76,8 +77,7 @@ where mut on_session_close, extensions, } = params; - let ServerConfig { ping_config, batch_requests_config, max_request_body_size, max_response_body_size, .. } = - server_cfg; + let ServerConfig { ping_config, batch_requests_config, max_request_body_size, .. } = server_cfg; let (conn_tx, conn_rx) = oneshot::channel(); @@ -157,30 +157,21 @@ where } }; - if let Some(rp) = handle_rpc_call( - &data[idx..], - is_single, - batch_requests_config, - max_response_body_size, - &*rpc_service, - extensions, - ) - .await - { - if !rp.is_subscription() { - let is_success = rp.is_success(); - let (serialized_rp, mut on_close) = rp.into_parts(); - - // The connection is closed, just quit. - if sink.send(serialized_rp).await.is_err() { - return; - } + let rp = handle_rpc_call(&data[idx..], is_single, batch_requests_config, &*rpc_service, extensions).await; - // Notify that the message has been sent out to the internal - // WebSocket buffer. - if let Some(n) = on_close.take() { - n.notify(is_success); - } + if !rp.is_subscription() || !rp.is_notification() { + let is_success = rp.is_success(); + let (serialized_rp, mut on_close) = rp.into_parts(); + + // The connection is closed, just quit. + if sink.send(serialized_rp).await.is_err() { + return; + } + + // Notify that the message has been sent out to the internal + // WebSocket buffer. + if let Some(n) = on_close.take() { + n.notify(is_success); } } }); diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 86481a7ab7..845634e08b 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -35,9 +35,10 @@ use std::time::Duration; use fast_socks5::client::Socks5Stream; use fast_socks5::server; use futures::{SinkExt, Stream, StreamExt}; +use jsonrpsee::core::middleware::Notification; use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; -use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::core::middleware::RpcServiceT; use jsonrpsee::server::{ serve_with_graceful_shutdown, stop_channel, ConnectionGuard, PendingSubscriptionSink, RpcModule, RpcServiceBuilder, Server, ServerBuilder, ServerHandle, SubscriptionMessage, TrySendError, @@ -160,6 +161,14 @@ pub async fn server() -> SocketAddr { request.extensions_mut().insert(self.connection_id); self.inner.call(request) } + + fn batch(&self, requests: Vec>) -> Self::Future { + self.inner.batch(requests) + } + + fn notification(&self, n: Notification<'a>) -> Self::Future { + self.inner.notification(n) + } } let mut module = RpcModule::new(()); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 81140a1d25..b0a4e98a45 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -1466,10 +1466,10 @@ async fn server_ws_low_api_works() { async fn run_server() -> anyhow::Result { use futures_util::future::FutureExt; - use jsonrpsee::core::BoxError; + use jsonrpsee::core::{middleware::RpcServiceBuilder, BoxError}; use jsonrpsee::server::{ - http, middleware::rpc::RpcServiceBuilder, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, - ConnectionState, Methods, ServerConfig, StopHandle, + http, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, ConnectionState, Methods, + ServerConfig, StopHandle, }; let listener = tokio::net::TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 0))).await?; diff --git a/tests/tests/metrics.rs b/tests/tests/metrics.rs index c8cef4d626..85e41e88e0 100644 --- a/tests/tests/metrics.rs +++ b/tests/tests/metrics.rs @@ -37,10 +37,10 @@ use std::time::Duration; use futures::future::BoxFuture; use futures::FutureExt; use helpers::init_logger; +use jsonrpsee::core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee::core::{client::ClientT, ClientError}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::middleware::rpc::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::{Server, ServerHandle}; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Id, Request}; use jsonrpsee::ws_client::WsClientBuilder; @@ -97,6 +97,14 @@ where } .boxed() } + + fn batch(&self, _requests: Vec>) -> Self::Future { + todo!(); + } + + fn notification(&self, _n: jsonrpsee::core::middleware::Notification<'a>) -> Self::Future { + todo!(); + } } fn test_module() -> RpcModule<()> { From 93c9b6f395bc47c3dccb3ea0827a704399a77f8f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Feb 2025 10:28:50 +0100 Subject: [PATCH 5/6] use Cow in Notification to avoid alloc --- client/http-client/src/client.rs | 2 +- client/http-client/src/rpc_service.rs | 4 ++-- core/src/middleware/layer/either.rs | 11 ++++------- core/src/middleware/mod.rs | 3 ++- server/src/server.rs | 10 ++++------ 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 2b24af8b64..bffd76437d 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -369,7 +369,7 @@ where Some(permit) => permit.acquire().await.ok(), None => None, }; - let params = params.to_rpc_params()?; + let params = params.to_rpc_params()?.map(StdCow::Owned); let n = Notification { jsonrpc: TwoPointZero, method: method.into(), params }; self.transport.notification(n).await; Ok(()) diff --git a/client/http-client/src/rpc_service.rs b/client/http-client/src/rpc_service.rs index 963dcc0b5a..02536c7a52 100644 --- a/client/http-client/src/rpc_service.rs +++ b/client/http-client/src/rpc_service.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures_util::{future::BoxFuture, FutureExt}; use hyper::body::Bytes; use jsonrpsee_core::{ - middleware::RpcServiceT, + middleware::{Notification, RpcServiceT}, server::{BatchResponseBuilder, MethodResponse, ResponsePayload}, BoxError, JsonRawValue, }; @@ -81,7 +81,7 @@ where .boxed() } - fn notification(&self, notif: jsonrpsee_types::Notification<'a, Option>>) -> Self::Future { + fn notification(&self, notif: Notification<'a>) -> Self::Future { let raw = serde_json::to_string(¬if).unwrap(); let service = self.service.clone(); diff --git a/core/src/middleware/layer/either.rs b/core/src/middleware/layer/either.rs index b64464d974..b7d3766fbe 100644 --- a/core/src/middleware/layer/either.rs +++ b/core/src/middleware/layer/either.rs @@ -31,7 +31,7 @@ //! work to implement tower::Layer for //! external types such as future::Either. -use crate::middleware::RpcServiceT; +use crate::middleware::{Notification, RpcServiceT}; use jsonrpsee_types::Request; /// [`tower::util::Either`] but @@ -80,13 +80,10 @@ where } } - fn notification( - &self, - request: jsonrpsee_types::Notification<'a, Option>>, - ) -> Self::Future { + fn notification(&self, n: Notification<'a>) -> Self::Future { match self { - Either::Left(service) => futures_util::future::Either::Left(service.notification(request)), - Either::Right(service) => futures_util::future::Either::Right(service.notification(request)), + Either::Left(service) => futures_util::future::Either::Left(service.notification(n)), + Either::Right(service) => futures_util::future::Either::Right(service.notification(n)), } } } diff --git a/core/src/middleware/mod.rs b/core/src/middleware/mod.rs index 58cffed70e..f8d241f91d 100644 --- a/core/src/middleware/mod.rs +++ b/core/src/middleware/mod.rs @@ -9,13 +9,14 @@ use serde_json::value::RawValue; use tower::layer::util::{Identity, Stack}; use tower::layer::LayerFn; +use std::borrow::Cow; use std::pin::Pin; use std::task::{Context, Poll}; use crate::server::MethodResponse; /// JSON-RPC notification. -pub type Notification<'a> = jsonrpsee_types::Notification<'a, Option>>; +pub type Notification<'a> = jsonrpsee_types::Notification<'a, Option>>; /// Similar to the [`tower::Service`] but specific for jsonrpsee and /// doesn't requires `&mut self` for performance reasons. diff --git a/server/src/server.rs b/server/src/server.rs index 656b4d7d54..a4b9b3c440 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -46,7 +46,7 @@ use futures_util::io::{BufReader, BufWriter}; use hyper::body::Bytes; use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee_core::id_providers::RandomIntegerIdProvider; -use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; +use jsonrpsee_core::middleware::{Notification, RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::server::helpers::prepare_error; use jsonrpsee_core::server::{BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods}; use jsonrpsee_core::traits::IdProvider; @@ -55,7 +55,7 @@ use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES}; use jsonrpsee_types::error::{ reject_too_big_batch_request, ErrorCode, BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, }; -use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification}; +use jsonrpsee_types::{ErrorObject, Id, InvalidRequest}; use soketto::handshake::http::is_upgrade_request; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio::sync::{mpsc, watch, OwnedSemaphorePermit}; @@ -64,8 +64,6 @@ use tower::layer::util::Identity; use tower::{Layer, Service}; use tracing::{instrument, Instrument}; -type Notif<'a> = Notification<'a, Option>>; - /// Default maximum connections allowed. const MAX_CONNECTIONS: u32 = 100; @@ -1238,7 +1236,7 @@ where if is_single { if let Ok(req) = deserialize::from_slice_with_extensions(body, extensions) { rpc_service.call(req).await - } else if let Ok(notif) = serde_json::from_slice::(body) { + } else if let Ok(notif) = serde_json::from_slice::(body) { rpc_service.notification(notif).await } else { let (id, code) = prepare_error(body); @@ -1270,7 +1268,7 @@ where for call in unchecked_batch { if let Ok(req) = deserialize::from_str_with_extensions(call.get(), extensions.clone()) { batch.push(req); - } else if let Ok(_notif) = serde_json::from_str::(call.get()) { + } else if let Ok(_notif) = serde_json::from_str::(call.get()) { got_notif = true; } else { let id = match serde_json::from_str::(call.get()) { From 5929436015cdab81b636762059060d697f5d8f72 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 26 Feb 2025 10:55:08 +0100 Subject: [PATCH 6/6] cleanup some todos --- core/src/middleware/mod.rs | 5 +++-- examples/examples/jsonrpsee_as_service.rs | 9 +++------ examples/examples/rpc_middleware_modify_request.rs | 8 ++++---- examples/examples/server_with_connection_details.rs | 12 ++++++------ examples/examples/ws.rs | 2 +- tests/tests/metrics.rs | 12 ++++++------ 6 files changed, 23 insertions(+), 25 deletions(-) diff --git a/core/src/middleware/mod.rs b/core/src/middleware/mod.rs index f8d241f91d..d137f7cd95 100644 --- a/core/src/middleware/mod.rs +++ b/core/src/middleware/mod.rs @@ -3,7 +3,6 @@ pub mod layer; use futures_util::future::{Either, Future}; -use jsonrpsee_types::Request; use pin_project::pin_project; use serde_json::value::RawValue; use tower::layer::util::{Identity, Stack}; @@ -15,8 +14,10 @@ use std::task::{Context, Poll}; use crate::server::MethodResponse; -/// JSON-RPC notification. +/// Re-export types from `jsonrpsee_types` crate for convenience pub type Notification<'a> = jsonrpsee_types::Notification<'a, Option>>; +/// Re-export types from `jsonrpsee_types` crate for convenience +pub use jsonrpsee_types::Request; /// Similar to the [`tower::Service`] but specific for jsonrpsee and /// doesn't requires `&mut self` for performance reasons. diff --git a/examples/examples/jsonrpsee_as_service.rs b/examples/examples/jsonrpsee_as_service.rs index 3daa87fb87..decaa8b8d6 100644 --- a/examples/examples/jsonrpsee_as_service.rs +++ b/examples/examples/jsonrpsee_as_service.rs @@ -217,12 +217,9 @@ async fn run_server(metrics: Metrics) -> anyhow::Result { // NOTE, the rpc middleware must be initialized here to be able to created once per connection // with data from the connection such as the headers in this example let headers = req.headers().clone(); - let rpc_middleware = RpcServiceBuilder::new() /* .rpc_logger(1024)*/ - .layer_fn(move |service| AuthorizationMiddleware { - inner: service, - headers: headers.clone(), - transport_label, - }); + let rpc_middleware = RpcServiceBuilder::new().rpc_logger(1024).layer_fn(move |service| { + AuthorizationMiddleware { inner: service, headers: headers.clone(), transport_label } + }); let mut svc = svc_builder .set_http_middleware(http_middleware) diff --git a/examples/examples/rpc_middleware_modify_request.rs b/examples/examples/rpc_middleware_modify_request.rs index 548375c09f..09c62821e6 100644 --- a/examples/examples/rpc_middleware_modify_request.rs +++ b/examples/examples/rpc_middleware_modify_request.rs @@ -58,12 +58,12 @@ where self.0.call(req) } - fn batch(&self, _reqs: Vec>) -> Self::Future { - todo!(); + fn batch(&self, requests: Vec>) -> Self::Future { + self.0.batch(requests) } - fn notification(&self, _n: Notification<'a>) -> Self::Future { - todo!(); + fn notification(&self, n: Notification<'a>) -> Self::Future { + self.0.notification(n) } } diff --git a/examples/examples/server_with_connection_details.rs b/examples/examples/server_with_connection_details.rs index bde27faf2d..cd9c203698 100644 --- a/examples/examples/server_with_connection_details.rs +++ b/examples/examples/server_with_connection_details.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use jsonrpsee::core::async_trait; -use jsonrpsee::core::middleware::{Notification, RpcServiceT}; +use jsonrpsee::core::middleware::{Notification, Request, RpcServiceT}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::{PendingSubscriptionSink, SubscriptionMessage}; @@ -54,19 +54,19 @@ struct LoggingMiddleware(S); impl<'a, S: RpcServiceT<'a>> RpcServiceT<'a> for LoggingMiddleware { type Future = S::Future; - fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future { + fn call(&self, request: Request<'a>) -> Self::Future { tracing::info!("Received request: {:?}", request); assert!(request.extensions().get::().is_some()); self.0.call(request) } - fn batch(&self, _requests: Vec>) -> Self::Future { - todo!(); + fn batch(&self, requests: Vec>) -> Self::Future { + self.0.batch(requests) } - fn notification(&self, _n: Notification<'a>) -> Self::Future { - todo!(); + fn notification(&self, n: Notification<'a>) -> Self::Future { + self.0.notification(n) } } diff --git a/examples/examples/ws.rs b/examples/examples/ws.rs index c84762d3b2..6587585cc3 100644 --- a/examples/examples/ws.rs +++ b/examples/examples/ws.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - let rpc_middleware = RpcServiceBuilder::new()/* .rpc_logger(1024)*/; + let rpc_middleware = RpcServiceBuilder::new().rpc_logger(1024); let server = Server::builder().set_rpc_middleware(rpc_middleware).build("127.0.0.1:0").await?; let mut module = RpcModule::new(()); module.register_method("say_hello", |_, _, _| "lo")?; diff --git a/tests/tests/metrics.rs b/tests/tests/metrics.rs index 85e41e88e0..45d5a8c1d0 100644 --- a/tests/tests/metrics.rs +++ b/tests/tests/metrics.rs @@ -37,12 +37,12 @@ use std::time::Duration; use futures::future::BoxFuture; use futures::FutureExt; use helpers::init_logger; -use jsonrpsee::core::middleware::{RpcServiceBuilder, RpcServiceT}; +use jsonrpsee::core::middleware::{Notification, Request, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::core::{client::ClientT, ClientError}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::{Server, ServerHandle}; -use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Id, Request}; +use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Id}; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::RpcModule; use jsonrpsee::{rpc_params, MethodResponse}; @@ -98,12 +98,12 @@ where .boxed() } - fn batch(&self, _requests: Vec>) -> Self::Future { - todo!(); + fn batch(&self, requests: Vec>) -> Self::Future { + self.service.batch(requests).boxed() } - fn notification(&self, _n: jsonrpsee::core::middleware::Notification<'a>) -> Self::Future { - todo!(); + fn notification(&self, n: Notification<'a>) -> Self::Future { + self.service.notification(n).boxed() } }