Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[servers]: subscriptionSink support params. #318

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
622d244
[ws server]: draft SubscriptionSinkWithParams
niklasad1 May 12, 2021
4f1743a
rexport types
niklasad1 May 12, 2021
1e5d0c3
PoC design2
niklasad1 May 12, 2021
420d13d
improve example
niklasad1 May 13, 2021
e041d32
Merge branch 'master' into na-subscription-design2
dvdplm May 14, 2021
6b1ab8b
Update ws-server/src/server.rs
niklasad1 May 17, 2021
8605f63
Merge branch 'na-subscription-design2' of github.com:paritytech/jsonr…
dvdplm May 17, 2021
531e805
Merge remote-tracking branch 'origin/master' into na-subscription-des…
dvdplm May 18, 2021
6164687
Subscription example (#324)
dvdplm May 18, 2021
b381db5
Merge remote-tracking branch 'origin/master' into na-subscription-des…
niklasad1 May 19, 2021
7ef0f60
grumbles: impl maciej proposal
niklasad1 May 19, 2021
89ba833
fix test build
niklasad1 May 19, 2021
fa16a5c
add test for subscription with param
niklasad1 May 19, 2021
d61f47a
cargo fmt
niklasad1 May 19, 2021
aaebe60
Update examples/ws_subscription.rs
niklasad1 May 20, 2021
fa2a82f
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
59e24f5
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
21f3a92
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
57009b2
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
c831661
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
5269e53
grumbles
niklasad1 May 20, 2021
4e8ed83
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
ca777fb
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
27f8cc7
Update utils/src/server/rpc_module.rs
niklasad1 May 20, 2021
f27e101
fix more grumbles
niklasad1 May 20, 2021
918b5d8
Merge branch 'na-subscription-design2' of github.com:paritytech/jsonr…
niklasad1 May 20, 2021
29ccc6e
Merge remote-tracking branch 'origin/master' into na-subscription-des…
niklasad1 May 21, 2021
ca4ad3f
[subscriptionSink]: introduce into_sinks
niklasad1 May 21, 2021
a5b86b2
use replace
niklasad1 May 21, 2021
5e50b8e
fix more nits
niklasad1 May 21, 2021
ec3e039
Merge branch 'master' into na-subscription-design2
dvdplm May 24, 2021
9249e45
Remove comment
dvdplm May 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ path = "ws.rs"
name = "ws_subscription"
path = "ws_subscription.rs"

[[example]]
name = "ws_sub_with_params"
path = "ws_sub_with_params.rs"

[[example]]
name = "proc_macro"
path = "proc_macro.rs"
82 changes: 82 additions & 0 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
ws_client::{traits::SubscriptionClient, v2::params::JsonRpcParams, WsClientBuilder},
ws_server::WsServer,
};
use std::net::SocketAddr;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let addr = run_server().await?;
let url = format!("ws://{}", addr);

let client = WsClientBuilder::default().build(&url).await?;

// Subscription with a single parameter
let params = JsonRpcParams::Array(vec![3.into()]);
let mut sub_params_one = client.subscribe::<Option<char>>("sub_one_param", params, "unsub_one_param").await?;
println!("subscription with one param: {:?}", sub_params_one.next().await);

// Subscription with multiple parameters
let params = JsonRpcParams::Array(vec![2.into(), 5.into()]);
let mut sub_params_two = client.subscribe::<String>("sub_params_two", params, "unsub_params_two").await?;
println!("subscription with two params: {:?}", sub_params_two.next().await);

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
const LETTERS: &'static str = "abcdefghijklmnopqrstuvxyz";
let mut server = WsServer::new("127.0.0.1:0").await?;
let one_param = server.register_subscription("sub_one_param", "unsub_one_param").unwrap();
let two_params = server.register_subscription("sub_params_two", "unsub_params_two").unwrap();

std::thread::spawn(move || loop {
for sink_params in one_param.extract_with_input().iter() {
let idx = *sink_params.params();
let result = LETTERS.chars().nth(idx);
let _ = sink_params.send(&result);
}
std::thread::sleep(std::time::Duration::from_millis(50));
});

std::thread::spawn(move || loop {
for sink_params in two_params.extract_with_input().iter() {
let params: &Vec<usize> = sink_params.params();
// Validate your params here: check len, check > 0 etc
let result = LETTERS[params[0]..params[1]].to_string();
let _ = sink_params.send(&result);
}
std::thread::sleep(std::time::Duration::from_millis(100));
});

let addr = server.local_addr();
tokio::spawn(async move { server.start().await });
addr
}
4 changes: 2 additions & 2 deletions examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ async fn main() -> anyhow::Result<()> {

async fn run_server() -> anyhow::Result<SocketAddr> {
let mut server = WsServer::new("127.0.0.1:0").await?;
let mut subscription = server.register_subscription("subscribe_hello", "unsubscribe_hello").unwrap();
let mut subscription = server.register_subscription::<String>("subscribe_hello", "unsubscribe_hello").unwrap();

std::thread::spawn(move || loop {
subscription.send(&"hello my friend").unwrap();
subscription.send_all(&"hello my friend").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
});

Expand Down
4 changes: 3 additions & 1 deletion ws-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ mod server;
mod tests;

pub use jsonrpsee_types::error::Error;
pub use server::{RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink};
pub use server::{
InnerSink, RpcContextModule, RpcModule, Server as WsServer, SubscriptionSink
};
107 changes: 95 additions & 12 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::StreamExt;
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::value::to_raw_value;
use soketto::handshake::{server::Response, Server as SokettoServer};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use std::convert::{TryFrom, TryInto};
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
Expand All @@ -49,16 +49,21 @@ mod module;
pub use module::{RpcContextModule, RpcModule};

type SubscriptionId = u64;
type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), mpsc::UnboundedSender<String>>>>;
type Subscribers<P> = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), InnerSink<P>>>>;

#[derive(Clone)]
pub struct SubscriptionSink {
pub struct SubscriptionSink<P> {
method: &'static str,
subscribers: Subscribers,
subscribers: Subscribers<P>,
}

impl SubscriptionSink {
pub fn send<T>(&mut self, result: &T) -> anyhow::Result<()>
impl<P> SubscriptionSink<P> {
/// Send a message on all the subscriptions
///
/// If you have subscriptions with params/input you should most likely
/// call `extract_with_input` to the process the input/params and send out
/// the result on each subscription individually instead.
pub fn send_all<T>(&mut self, result: &T) -> anyhow::Result<()>
where
T: Serialize,
{
Expand All @@ -75,7 +80,7 @@ impl SubscriptionSink {
})?;

// Log broken connections
if sender.unbounded_send(msg).is_err() {
if sender.sink.unbounded_send(msg).is_err() {
errored.push((*conn_id, *sub_id));
}
}
Expand All @@ -87,6 +92,85 @@ impl SubscriptionSink {

Ok(())
}

/// Extract subscriptions with input.
/// Usually, you want process the input before sending back
/// data on that subscription.
pub fn extract_with_input(&self) -> Vec<InnerSinkWithParams<P>> {
let mut subs = self.subscribers.lock();

let mut input = Vec::new();
*subs = std::mem::take(&mut *subs)
.into_iter()
.filter_map(|(k, v)| {
if v.params.is_some() {
let with_input = v.try_into().expect("is Some checked above; qed");
input.push(with_input);
None
} else {
Some((k, v))
}
})
.collect();
input
}
}

pub struct InnerSink<P> {
/// Sink.
sink: mpsc::UnboundedSender<String>,
/// Params.
params: Option<P>,
/// Subscription ID.
sub_id: SubscriptionId,
/// Method name
method: &'static str,
}

pub struct InnerSinkWithParams<P> {
/// Sink.
sink: mpsc::UnboundedSender<String>,
/// Params.
params: P,
/// Subscription ID.
sub_id: SubscriptionId,
/// Method name
method: &'static str,
}

impl<P> TryFrom<InnerSink<P>> for InnerSinkWithParams<P> {
type Error = ();

fn try_from(other: InnerSink<P>) -> Result<Self, Self::Error> {
match other.params {
Some(params) => Ok(InnerSinkWithParams { sink: other.sink, params, sub_id: other.sub_id, method: other.method }),
None => Err(())
}
}
}

impl<P> InnerSinkWithParams<P> {
/// Send data on a specific subscription
/// Note: a subscription can be "subscribed" to arbitary number of times with
/// diffrent input/params.
pub fn send<T>(&self, result: &T) -> anyhow::Result<()>
where
T: Serialize,
{
let result = to_raw_value(result)?;
let msg = serde_json::to_string(&JsonRpcNotification {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: self.sub_id, result: &*result },
})?;

self.sink.unbounded_send(msg).map_err(|e| anyhow::anyhow!("{:?}", e))
}

/// Get the input/params of the subscrption.
pub fn params(&self) -> &P {
&self.params
}
}

pub struct Server {
Expand All @@ -112,14 +196,13 @@ impl Server {
}

/// Register a new RPC subscription, with subscribe and unsubscribe methods.
pub fn register_subscription(
pub fn register_subscription<P: DeserializeOwned + Send + Sync + 'static>(
&mut self,
subscribe_method_name: &'static str,
unsubscribe_method_name: &'static str,
) -> Result<SubscriptionSink, Error> {
) -> Result<SubscriptionSink<P>, Error> {
self.root.register_subscription(subscribe_method_name, unsubscribe_method_name)
}

/// Register all methods from a module on this server.
pub fn register_module(&mut self, module: RpcModule) -> Result<(), Error> {
self.root.merge(module)
Expand Down
19 changes: 13 additions & 6 deletions ws-server/src/server/module.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::server::{RpcParams, SubscriptionId, SubscriptionSink};
use crate::server::{RpcParams, SubscriptionId, SubscriptionSink, InnerSink};
use jsonrpsee_types::{
error::{CallError, Error},
v2::error::{JsonRpcErrorCode, JsonRpcErrorObject},
Expand All @@ -7,7 +7,7 @@ use jsonrpsee_types::{traits::RpcMethod, v2::error::CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_utils::server::{send_error, send_response, Methods};
use parking_lot::Mutex;
use rustc_hash::FxHashMap;
use serde::Serialize;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;

#[derive(Default)]
Expand Down Expand Up @@ -67,11 +67,11 @@ impl RpcModule {
}

/// Register a new RPC subscription, with subscribe and unsubscribe methods.
pub fn register_subscription(
pub fn register_subscription<P: DeserializeOwned + Send + Sync + 'static>(
&mut self,
subscribe_method_name: &'static str,
unsubscribe_method_name: &'static str,
) -> Result<SubscriptionSink, Error> {
) -> Result<SubscriptionSink<P>, Error> {
if subscribe_method_name == unsubscribe_method_name {
return Err(Error::SubscriptionNameConflict(subscribe_method_name.into()));
}
Expand All @@ -85,13 +85,20 @@ impl RpcModule {
let subscribers = subscribers.clone();
self.methods.insert(
subscribe_method_name,
Box::new(move |id, _, tx, conn| {
Box::new(move |id, params, tx, conn| {
let params = params.parse().or_else(|_| params.one().map_err(|_| CallError::InvalidParams)).ok();
let sub_id = {
const JS_NUM_MASK: SubscriptionId = !0 >> 11;

let sub_id = rand::random::<SubscriptionId>() & JS_NUM_MASK;

subscribers.lock().insert((conn, sub_id), tx.clone());
let inner = InnerSink {
sink: tx.clone(),
sub_id,
params,
method: subscribe_method_name
};
subscribers.lock().insert((conn, sub_id), inner);

sub_id
};
Expand Down