Skip to content

Commit

Permalink
rpc: bump jsonrpsee v0.22 and fix race in rpc v2 chain_head (parity…
Browse files Browse the repository at this point in the history
…tech#3230)

Close paritytech#2992 

Breaking changes:
- rpc server grafana metric `substrate_rpc_requests_started` is removed
(not possible to implement anymore)
- rpc server grafana metric `substrate_rpc_requests_finished` is removed
(not possible to implement anymore)
- rpc server ws ping/pong not ACK:ed within 30 seconds more than three
times then the connection will be closed

Added
- rpc server grafana metric `substrate_rpc_sessions_time` is added to
get the duration for each websocket session
  • Loading branch information
niklasad1 authored Feb 14, 2024
1 parent 895af2c commit 9a55dd3
Show file tree
Hide file tree
Showing 41 changed files with 560 additions and 369 deletions.
2 changes: 1 addition & 1 deletion substrate/bin/minimal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ name = "minimal-node"
clap = { version = "4.4.18", features = ["derive"] }
futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.1"
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
serde_json = "1.0.111"

sc-cli = { path = "../../../client/cli" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node-template/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ frame-system = { path = "../../../frame/system" }
pallet-transaction-payment = { path = "../../../frame/transaction-payment", default-features = false }

# These dependencies are used for the node template's RPCs
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
sp-api = { path = "../../../primitives/api" }
sc-rpc-api = { path = "../../../client/rpc-api" }
sp-blockchain = { path = "../../../primitives/blockchain" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ array-bytes = "6.1"
clap = { version = "4.4.18", features = ["derive"], optional = true }
codec = { package = "parity-scale-codec", version = "3.6.1" }
serde = { version = "1.0.195", features = ["derive"] }
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
futures = "0.3.21"
log = { workspace = true, default-features = true }
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion substrate/bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
node-primitives = { path = "../primitives" }
pallet-transaction-payment-rpc = { path = "../../../frame/transaction-payment/rpc" }
mmr-rpc = { path = "../../../client/merkle-mountain-range/rpc" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/babe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
futures = "0.3.21"
serde = { version = "1.0.195", features = ["derive"] }
thiserror = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/consensus/babe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ mod tests {
let (response, _) = api.raw_json_request(request, 1).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":{"5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY":{"primary":[0],"secondary":[1,2,4],"secondary_vrf":[]}},"id":1}"#;

assert_eq!(&response.result, expected);
assert_eq!(response, expected);
}

#[tokio::test]
Expand All @@ -272,6 +272,6 @@ mod tests {
let (response, _) = api.raw_json_request(request, 1).await.unwrap();
let expected = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"RPC call is unsafe to be called externally"},"id":1}"#;

assert_eq!(&response.result, expected);
assert_eq!(response, expected);
}
}
2 changes: 1 addition & 1 deletion substrate/client/consensus/beefy/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] }
futures = "0.3.21"
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
log = { workspace = true, default-features = true }
parking_lot = "0.12.1"
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
16 changes: 7 additions & 9 deletions substrate/client/consensus/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ mod tests {
async fn uninitialized_rpc_handler() {
let (rpc, _) = setup_io_handler();
let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#.to_string();
let expected_response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();

assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}

#[tokio::test]
Expand All @@ -205,20 +205,18 @@ mod tests {
\"jsonrpc\":\"2.0\",\
\"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\",\
\"id\":1\
}"
.to_string();
}";
let not_ready = "{\
\"jsonrpc\":\"2.0\",\
\"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"},\
\"id\":1\
}"
.to_string();
}";

let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while std::time::Instant::now() < deadline {
let (response, _) = io.raw_json_request(request, 1).await.expect("RPC requests work");
if response.result != not_ready {
assert_eq!(response.result, expected);
if response != not_ready {
assert_eq!(response, expected);
// Success
return
}
Expand Down Expand Up @@ -249,7 +247,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;

assert_eq!(response.result, expected);
assert_eq!(response, expected);
}

fn create_finality_proof() -> BeefyVersionedFinalityProof<Block> {
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ workspace = true
[dependencies]
finality-grandpa = { version = "0.16.2", features = ["derive-codec"] }
futures = "0.3.16"
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
log = { workspace = true, default-features = true }
parity-scale-codec = { version = "3.6.1", features = ["derive"] }
serde = { version = "1.0.195", features = ["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/consensus/grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ mod tests {
let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();

assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}

#[tokio::test]
Expand All @@ -295,7 +295,7 @@ mod tests {

let request = r#"{"jsonrpc":"2.0","method":"grandpa_roundState","params":[],"id":0}"#;
let (response, _) = rpc.raw_json_request(&request, 1).await.unwrap();
assert_eq!(expected_response, response.result);
assert_eq!(expected_response, response);
}

#[tokio::test]
Expand All @@ -317,7 +317,7 @@ mod tests {
.unwrap();
let expected = r#"{"jsonrpc":"2.0","result":false,"id":1}"#;

assert_eq!(response.result, expected);
assert_eq!(response, expected);
}

fn create_justification() -> GrandpaJustification<Block> {
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
assert_matches = "1.3.0"
async-trait = "0.1.74"
codec = { package = "parity-scale-codec", version = "3.6.1" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/merkle-mountain-range/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
codec = { package = "parity-scale-codec", version = "3.6.1" }
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
serde = { version = "1.0.195", features = ["derive"] }
sp-api = { path = "../../../primitives/api" }
sp-blockchain = { path = "../../../primitives/blockchain" }
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/rpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ sp-core = { path = "../../primitives/core" }
sp-rpc = { path = "../../primitives/rpc" }
sp-runtime = { path = "../../primitives/runtime" }
sp-version = { path = "../../primitives/version" }
jsonrpsee = { version = "0.20.3", features = ["client-core", "macros", "server"] }
jsonrpsee = { version = "0.22", features = ["client-core", "macros", "server"] }
5 changes: 4 additions & 1 deletion substrate/client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.20.3", features = ["server"] }
jsonrpsee = { version = "0.22", features = ["server"] }
log = { workspace = true, default-features = true }
serde_json = "1.0.111"
tokio = { version = "1.22.0", features = ["parking_lot"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus" }
tower-http = { version = "0.4.0", features = ["cors"] }
tower = { version = "0.4.13", features = ["util"] }
http = "0.2.8"
hyper = "0.14.27"
futures = "0.3.29"
pin-project = "1.1.3"
101 changes: 85 additions & 16 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,32 @@

pub mod middleware;

use std::{error::Error as StdError, net::SocketAddr, time::Duration};
use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};

use http::header::HeaderValue;
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use jsonrpsee::{
server::middleware::{HostFilterLayer, ProxyGetRequestLayer},
RpcModule,
server::{
middleware::{
http::{HostFilterLayer, ProxyGetRequestLayer},
rpc::RpcServiceBuilder,
},
stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder,
},
Methods, RpcModule,
};
use tokio::net::TcpListener;
use tower::Service;
use tower_http::cors::{AllowOrigin, CorsLayer};

pub use crate::middleware::RpcMetrics;
pub use jsonrpsee::core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
};
pub use middleware::{MetricsLayer, RpcMetrics};

const MEGABYTE: u32 = 1024 * 1024;

Expand Down Expand Up @@ -92,7 +103,7 @@ pub async fn start_server<M: Send + Sync + 'static>(
let local_addr = std_listener.local_addr().ok();
let host_filter = hosts_filtering(cors.is_some(), local_addr);

let middleware = tower::ServiceBuilder::new()
let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
Expand All @@ -103,33 +114,82 @@ pub async fn start_server<M: Send + Sync + 'static>(
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.ping_interval(Duration::from_secs(30))
.set_middleware(middleware)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(message_buffer_capacity)
.custom_tokio_runtime(tokio_handle);
.custom_tokio_runtime(tokio_handle.clone());

if let Some(provider) = id_provider {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let rpc_api = build_rpc_api(rpc_api);
let handle = if let Some(metrics) = metrics {
let server = builder.set_logger(metrics).build_from_tcp(std_listener)?;
server.start(rpc_api)
} else {
let server = builder.build_from_tcp(std_listener)?;
server.start(rpc_api)
let (stop_handle, server_handle) = stop_channel();
let cfg = PerConnection {
methods: build_rpc_api(rpc_api).into(),
service_builder: builder.to_service_builder(),
metrics,
tokio_handle,
stop_handle: stop_handle.clone(),
};

let make_service = make_service_fn(move |_conn: &AddrStream| {
let cfg = cfg.clone();

async move {
let cfg = cfg.clone();

Ok::<_, Infallible>(service_fn(move |req| {
let PerConnection { service_builder, metrics, tokio_handle, stop_handle, methods } =
cfg.clone();

let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };

let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label));
let rpc_middleware = RpcServiceBuilder::new().option_layer(metrics.clone());
let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);

async move {
if is_websocket {
let on_disconnect = svc.on_session_closed();

// Spawn a task to handle when the connection is closed.
tokio_handle.spawn(async move {
let now = std::time::Instant::now();
metrics.as_ref().map(|m| m.ws_connect());
on_disconnect.await;
metrics.as_ref().map(|m| m.ws_disconnect(now));
});
}

svc.call(req).await
}
}))
}
});

let server = hyper::Server::from_tcp(std_listener)?.serve(make_service);

tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move { stop_handle.shutdown().await });
let _ = graceful.await;
});

log::info!(
"Running JSON-RPC server: addr={}, allowed origins={}",
local_addr.map_or_else(|| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);

Ok(handle)
Ok(server_handle)
}

fn hosts_filtering(enabled: bool, addr: Option<SocketAddr>) -> Option<HostFilterLayer> {
Expand Down Expand Up @@ -185,3 +245,12 @@ fn format_cors(maybe_cors: Option<&Vec<String>>) -> String {
format!("{:?}", ["*"])
}
}

#[derive(Clone)]
struct PerConnection<RpcMiddleware, HttpMiddleware> {
methods: Methods,
stop_handle: StopHandle,
metrics: Option<RpcMetrics>,
tokio_handle: tokio::runtime::Handle,
service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
}
Loading

0 comments on commit 9a55dd3

Please sign in to comment.