Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[servers]: subscriptionSink support params. #318

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
622d244
[ws server]: draft SubscriptionSinkWithParams
niklasad1 May 12, 2021
4f1743a
rexport types
niklasad1 May 12, 2021
1e5d0c3
PoC design2
niklasad1 May 12, 2021
420d13d
improve example
niklasad1 May 13, 2021
e041d32
Merge branch 'master' into na-subscription-design2
dvdplm May 14, 2021
6b1ab8b
Update ws-server/src/server.rs
niklasad1 May 17, 2021
8605f63
Merge branch 'na-subscription-design2' of github.com:paritytech/jsonr…
dvdplm May 17, 2021
531e805
Merge remote-tracking branch 'origin/master' into na-subscription-des…
dvdplm May 18, 2021
6164687
Subscription example (#324)
dvdplm May 18, 2021
b381db5
Merge remote-tracking branch 'origin/master' into na-subscription-des…
niklasad1 May 19, 2021
7ef0f60
grumbles: impl maciej proposal
niklasad1 May 19, 2021
89ba833
fix test build
niklasad1 May 19, 2021
fa16a5c
add test for subscription with param
niklasad1 May 19, 2021
d61f47a
cargo fmt
niklasad1 May 19, 2021
aaebe60
Update examples/ws_subscription.rs
niklasad1 May 20, 2021
fa2a82f
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
59e24f5
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
21f3a92
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
57009b2
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
c831661
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
5269e53
grumbles
niklasad1 May 20, 2021
4e8ed83
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
ca777fb
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
27f8cc7
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
f27e101
fix more grumbles
niklasad1 May 20, 2021
918b5d8
Merge branch 'na-subscription-design2' of github.com:paritytech/jsonr…
niklasad1 May 20, 2021
29ccc6e
Merge remote-tracking branch 'origin/master' into na-subscription-des…
niklasad1 May 21, 2021
ca4ad3f
[subscriptionSink]: introduce into_sinks
niklasad1 May 21, 2021
a5b86b2
use replace
niklasad1 May 21, 2021
5e50b8e
fix more nits
niklasad1 May 21, 2021
ec3e039
Merge branch 'master' into na-subscription-design2
dvdplm May 24, 2021
9249e45
Remove comment
dvdplm May 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ path = "ws.rs"
name = "ws_subscription"
path = "ws_subscription.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"

[[example]]
name = "proc_macro"
path = "proc_macro.rs"
73 changes: 73 additions & 0 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;

// Subscription with a single parameter
let params = JsonRpcParams::Array(vec![3.into()]);
let mut sub_params_one = client.subscribe::<Option<char>>("sub_one_param", params, "unsub_one_param").await?;
println!("subscription with one param: {:?}", sub_params_one.next().await);

// Subscription with multiple parameters
let params = JsonRpcParams::Array(vec![2.into(), 5.into()]);
let mut sub_params_two = client.subscribe::<String>("sub_params_two", params, "unsub_params_two").await?;
println!("subscription with two params: {:?}", sub_params_two.next().await);

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz";
let mut server = WsServer::new("127.0.0.1:0").await?;
let one_param = server.register_subscription("sub_one_param", "unsub_one_param").unwrap();
let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap();

std::thread::spawn(move || loop {
let _ = one_param.send_each(|idx| Ok(LETTERS.chars().nth(*idx)));
std::thread::sleep(std::time::Duration::from_millis(50));
});

std::thread::spawn(move || loop {
let _ = two_params.send_each(|params: &Vec<usize>| Ok(Some(LETTERS[params[0]..params[1]].to_string())));
std::thread::sleep(std::time::Duration::from_millis(100));
});

let addr = server.local_addr()?;
tokio::spawn(async move { server.start().await });
Ok(addr)
}
4 changes: 2 additions & 2 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServer::new("127.0.0.1:0").await?;
let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();
let subscription = server.register_subscription::<()>("subscribe_hello", "unsubscribe_hello").unwrap();

std::thread::spawn(move || loop {
subscription.send(&"hello my friend").unwrap();
subscription.broadcast(&"hello my friend").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
});

Expand Down
18 changes: 12 additions & 6 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
// DEALINGS IN THE SOFTWARE.

use futures_channel::oneshot;
use jsonrpsee::{http_server::HttpServerBuilder, ws_server::WsServer};
use jsonrpsee::{
http_server::HttpServerBuilder,
ws_server::{SubscriptionSink, WsServer},
};
use std::net::SocketAddr;
use std::time::Duration;

Expand All @@ -36,9 +39,11 @@ pub async fn websocket_server_with_subscription() -> SocketAddr {
let rt = tokio::runtime::Runtime::new().unwrap();

let mut server = rt.block_on(WsServer::new("127.0.0.1:0")).unwrap();
let mut sub_hello = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();
let mut sub_foo = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap();

let sub_hello: SubscriptionSink<()> =
server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();
let sub_foo: SubscriptionSink<()> = server.register_subscription("subscribe_foo", "unsubscribe_foo").unwrap();
let sub_add_one: SubscriptionSink<u64> =
server.register_subscription("subscribe_add_one", "unsubscribe_add_one").unwrap();
server.register_method("say_hello", |_| Ok("hello")).unwrap();

server_started_tx.send(server.local_addr().unwrap()).unwrap();
Expand All @@ -49,8 +54,9 @@ pub async fn websocket_server_with_subscription() -> SocketAddr {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;

sub_hello.send(&"hello from subscription").unwrap();
sub_foo.send(&1337_u64).unwrap();
sub_hello.broadcast(&"hello from subscription").unwrap();
sub_foo.broadcast(&1337_u64).unwrap();
sub_add_one.send_each(|p| Ok(Some(*p + 1))).unwrap();
}
});
});
Expand Down
14 changes: 14 additions & 0 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ async fn ws_subscription_works() {
}
}

#[tokio::test]
async fn ws_subscription_with_input_works() {
let server_addr = websocket_server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);
let client = WsClientBuilder::default().build(&server_url).await.unwrap();
let mut add_one: Subscription<u64> =
client.subscribe("subscribe_add_one", vec![1.into()].into(), "unsubscribe_add_one").await.unwrap();

for _ in 0..2 {
let two = add_one.next().await.unwrap();
assert_eq!(two, 2);
}
}

#[tokio::test]
async fn ws_method_call_works() {
let server_addr = websocket_server().await;
Expand Down
2 changes: 1 addition & 1 deletion types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {
Request(String),
/// Frontend/backend channel error.
#[error("Frontend/backend channel error: {0}")]
Internal(#[source] futures_channel::mpsc::SendError),
Internal(#[from] futures_channel::mpsc::SendError),
/// Invalid response,
#[error("Invalid response: {0}")]
InvalidResponse(Mismatch<String>),
Expand Down
6 changes: 2 additions & 4 deletions types/src/v2/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ impl<'a> RpcParams<'a> {
where
T: Deserialize<'a>,
{
match self.0 {
None => Err(CallError::InvalidParams),
Some(params) => serde_json::from_str(params).map_err(|_| CallError::InvalidParams),
}
let params = self.0.unwrap_or("null");
Copy link
Member Author

@niklasad1 niklasad1 May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cover the case if the Params is Option::None or () but maybe it's weird because technically not according to the spec but wasn't required before either.

serde_json::from_str(params).map_err(|_| CallError::InvalidParams)
}

/// Attempt to parse only the first parameter from an array into type T
Expand Down
Loading