Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[servers]: subscriptionSink support params. #318

Closed
wants to merge 32 commits into from

Conversation

niklasad1
Copy link
Member

@niklasad1 niklasad1 commented May 12, 2021

Currently, the params within a subscription request is kept inside the server thus not possible to access the params.

This PR changes the SubscriptionSink<Param> to take a type parameter that must correspond to the param in the subscription request otherwise the request will be denied (not send back a subscription ID)

It adds two additional APIs:

  • call_with_params: a way to apply a closure to the params and send out the response on that specific subscription
  • to_sinks: consumes the current subscriptions, to expose the inner subscription sinks.

@niklasad1 niklasad1 changed the title [ws server]: subscriptionSink with input design 2 [ws server]: subscriptionSink, same API for register subscription with or without params. May 17, 2021
dvdplm and others added 3 commits May 17, 2021 12:52
* Add a test for calling methods with multiple params of multiple types (#308)

* Add a test for calling methods with multiple params of multiple types

* cargo fmt

Co-authored-by: Niklas Adolfsson <[email protected]>

* [ws client] RegisterNotification support (#303)

* Rename NotifResponse to SubscriptionResponse to make room for new impl

* Add support for on_notification Subscription<T> types

* Fix handling of NotificationHandler in manager

* cleanup

* Implement NotificationHandler to replace Subscription<T> and clean up plumbing

* More cleanup

* impl Drop for NotificationHandler

* Address pr feedback #1

* ws client register_notification pr feedback 2

* Fix doc

* fix typo

* Add tests, get NH working

* More cleanup of String/&str

* fix doc

* Drop notification handler on send_back_sink error

* ws client notification auto unsubscribe when channel full test

* Change order of type params to register_method (#312)

* Change order of type params to register_method

* Cleanup and fmt

* Update ws-server/src/tests.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* CI: optimize caching (#317)

* Bump actions/checkout from 2 to 2.3.4 (#315)

Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 2.3.4.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v2...v2.3.4)

Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions-rs/cargo from 1 to 1.0.3 (#314)

Bumps [actions-rs/cargo](https://github.com/actions-rs/cargo) from 1 to 1.0.3.
- [Release notes](https://github.com/actions-rs/cargo/releases)
- [Changelog](https://github.com/actions-rs/cargo/blob/master/CHANGELOG.md)
- [Commits](actions-rs/cargo@v1...v1.0.3)

Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions-rs/toolchain from 1 to 1.0.7 (#313)

Bumps [actions-rs/toolchain](https://github.com/actions-rs/toolchain) from 1 to 1.0.7.
- [Release notes](https://github.com/actions-rs/toolchain/releases)
- [Changelog](https://github.com/actions-rs/toolchain/blob/master/CHANGELOG.md)
- [Commits](actions-rs/toolchain@v1...v1.0.7)

Signed-off-by: dependabot[bot] <[email protected]>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [ws server]: add logs (#319)

* WIP - hangs

* fix example

* cleanup

* Add certificate_store() to WsClientBuilder (#321)

* Add custom_certificate to WsClientBuilder

* Use system certs instead of specified file

* Cache client_config

* Move client_config logic to fn build

* Default use_system_certificates to true

* Move out connector

* Add CertificateStore type

* cargo fmt

* cargo clippy

* Resolve comment: Rename variable

* Resolved comments

Co-authored-by: Niklas Adolfsson <[email protected]>
Co-authored-by: Billy Lindeman <[email protected]>
Co-authored-by: Denis Pisarev <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Albin Hedman <[email protected]>
Copy link
Contributor

@maciejhirsz maciejhirsz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to have a bigger think about this and #316. I think I prefer this PR since it's simpler, and the InnerSink makes sense to me.

@niklasad1
Copy link
Member Author

I had to have a bigger think about this and #316. I think I prefer this PR since it's simpler, and the InnerSink makes sense to me.

Ok, let's then polish this and create some benchmarks it on after @dvdplm grumbles ^^

@niklasad1 niklasad1 changed the title [ws server]: subscriptionSink, same API for register subscription with or without params. [rpc module]: subscriptionSink support params. May 19, 2021
@niklasad1 niklasad1 changed the title [rpc module]: subscriptionSink support params. [servers]: subscriptionSink support params. May 20, 2021
Copy link
Contributor

@dvdplm dvdplm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, modulo a bunch of nits and some naming.

@dvdplm dvdplm requested a review from maciejhirsz May 20, 2021 09:02
Copy link
Contributor

@maciejhirsz maciejhirsz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good to me, but I still have some opinionated suggestions to the API, feel free to disagree with anything :).

/// F: is a closure that you need to provide to apply on the input P.
pub fn send_each<T, F>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(&mut Params) -> T,
Copy link
Contributor

@maciejhirsz maciejhirsz May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Params being &mut looks like a foot gun to me.

I also think the closure should support error handling in user land:

F: Fn(&Params) -> anyhow::Result<Option<T>>, where:

  • Ok(Some(T)) - no errors, send T to the subscriber.
  • Ok(None) - no errors, don't send anything to the subscriber.
  • Err(_) - error, log it (not sure if we need to send anything back?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have to be F: Fn(&Option<Params>) -> anyhow::Result<Option<T>> I think?

This seems to work:

	pub fn send_each<T, F>(&self, f: F) -> anyhow::Result<()>
	where
		F: Fn(&Option<Params>) -> anyhow::Result<Option<T>>,
		T: Serialize,
	{
		let mut subs = self.subscribers.lock();
		let mut errored = Vec::new();

		for ((conn_id, sub_id), sender) in subs.iter_mut() {
			match f(&sender.params) {
				Ok(Some(res)) => {
					let msg = serde_json::to_string(&JsonRpcSubscriptionResponse {
						jsonrpc: TwoPointZero,
						method: self.method,
						params: JsonRpcNotificationParams { subscription: *sub_id, result: &to_raw_value(&res).expect("TODO what to do here?") },
					})?;

					// Mark broken connections, to be removed.
					if sender.sink.unbounded_send(msg).is_err() {
						errored.push((*conn_id, *sub_id));
					}
				},
				Ok(None) => log::warn!("Calling `send_each` for subscriptions that don't use params is… bad?"),
				Err(e) => log::error!("err: {:?}", e)
			}
		}

		// Remove broken connections
		for entry in errored {
			subs.remove(&entry);
		}

		Ok(())
	}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Params being Option<Params> instead of just Params is a separate issue, see the other comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's neat.

let mut errored = Vec::new();

for ((conn_id, sub_id), sender) in subs.iter_mut() {
let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) {
let result = match sender.params.as_ref().map(f) {

Closure return type is known, and it's ok if it's transient since it's going to be serialized in the same block, so there is no reason to use RawValue here.

Copy link
Member Author

@niklasad1 niklasad1 May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason was that this requires RawValue, it's sent a few lines below this.

But I could split it out to make it more clean/clear I suppose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just make the struct generic, no? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's possible but I'd prefer to do it another PR :)

None => Err(CallError::InvalidParams),
Some(params) => serde_json::from_str(params).map_err(|_| CallError::InvalidParams),
}
let params = self.0.unwrap_or("null");
Copy link
Member Author

@niklasad1 niklasad1 May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cover the case if the Params is Option::None or () but maybe it's weird because technically not according to the spec but wasn't required before either.

type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), MethodSink>>>;
/// Map of subscribers keyed by the connection and subscription ids to an [`InnerSink`] that contains the parameters
/// they used to subscribe and the tx side of a channel used to convey results and errors back.
type Subscribers<P> = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), InnerSink<P>>>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@niklasad1 niklasad1 marked this pull request as ready for review May 24, 2021 11:45
@niklasad1
Copy link
Member Author

Superseeded by #336

@niklasad1 niklasad1 closed this May 26, 2021
@niklasad1 niklasad1 deleted the na-subscription-design2 branch August 5, 2022 08:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants