-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[servers]: subscriptionSink support params. #318
Conversation
Co-authored-by: David <[email protected]>
…psee into na-subscription-design2
* Add a test for calling methods with multiple params of multiple types (#308) * Add a test for calling methods with multiple params of multiple types * cargo fmt Co-authored-by: Niklas Adolfsson <[email protected]> * [ws client] RegisterNotification support (#303) * Rename NotifResponse to SubscriptionResponse to make room for new impl * Add support for on_notification Subscription<T> types * Fix handling of NotificationHandler in manager * cleanup * Implement NotificationHandler to replace Subscription<T> and clean up plumbing * More cleanup * impl Drop for NotificationHandler * Address pr feedback #1 * ws client register_notification pr feedback 2 * Fix doc * fix typo * Add tests, get NH working * More cleanup of String/&str * fix doc * Drop notification handler on send_back_sink error * ws client notification auto unsubscribe when channel full test * Change order of type params to register_method (#312) * Change order of type params to register_method * Cleanup and fmt * Update ws-server/src/tests.rs Co-authored-by: Niklas Adolfsson <[email protected]> * CI: optimize caching (#317) * Bump actions/checkout from 2 to 2.3.4 (#315) Bumps [actions/checkout](https://github.com/actions/checkout) from 2 to 2.3.4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](actions/checkout@v2...v2.3.4) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/cargo from 1 to 1.0.3 (#314) Bumps [actions-rs/cargo](https://github.com/actions-rs/cargo) from 1 to 1.0.3. - [Release notes](https://github.com/actions-rs/cargo/releases) - [Changelog](https://github.com/actions-rs/cargo/blob/master/CHANGELOG.md) - [Commits](actions-rs/cargo@v1...v1.0.3) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump actions-rs/toolchain from 1 to 1.0.7 (#313) Bumps [actions-rs/toolchain](https://github.com/actions-rs/toolchain) from 1 to 1.0.7. - [Release notes](https://github.com/actions-rs/toolchain/releases) - [Changelog](https://github.com/actions-rs/toolchain/blob/master/CHANGELOG.md) - [Commits](actions-rs/toolchain@v1...v1.0.7) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [ws server]: add logs (#319) * WIP - hangs * fix example * cleanup * Add certificate_store() to WsClientBuilder (#321) * Add custom_certificate to WsClientBuilder * Use system certs instead of specified file * Cache client_config * Move client_config logic to fn build * Default use_system_certificates to true * Move out connector * Add CertificateStore type * cargo fmt * cargo clippy * Resolve comment: Rename variable * Resolved comments Co-authored-by: Niklas Adolfsson <[email protected]> Co-authored-by: Billy Lindeman <[email protected]> Co-authored-by: Denis Pisarev <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Albin Hedman <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to have a bigger think about this and #316. I think I prefer this PR since it's simpler, and the InnerSink
makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, modulo a bunch of nits and some naming.
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me, but I still have some opinionated suggestions to the API, feel free to disagree with anything :).
utils/src/server/rpc_module.rs
Outdated
/// F: is a closure that you need to provide to apply on the input P. | ||
pub fn send_each<T, F>(&self, f: F) -> anyhow::Result<()> | ||
where | ||
F: Fn(&mut Params) -> T, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Params
being &mut
looks like a foot gun to me.
I also think the closure should support error handling in user land:
F: Fn(&Params) -> anyhow::Result<Option<T>>
, where:
Ok(Some(T))
- no errors, sendT
to the subscriber.Ok(None)
- no errors, don't send anything to the subscriber.Err(_)
- error, log it (not sure if we need to send anything back?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would have to be F: Fn(&Option<Params>) -> anyhow::Result<Option<T>>
I think?
This seems to work:
pub fn send_each<T, F>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(&Option<Params>) -> anyhow::Result<Option<T>>,
T: Serialize,
{
let mut subs = self.subscribers.lock();
let mut errored = Vec::new();
for ((conn_id, sub_id), sender) in subs.iter_mut() {
match f(&sender.params) {
Ok(Some(res)) => {
let msg = serde_json::to_string(&JsonRpcSubscriptionResponse {
jsonrpc: TwoPointZero,
method: self.method,
params: JsonRpcNotificationParams { subscription: *sub_id, result: &to_raw_value(&res).expect("TODO what to do here?") },
})?;
// Mark broken connections, to be removed.
if sender.sink.unbounded_send(msg).is_err() {
errored.push((*conn_id, *sub_id));
}
},
Ok(None) => log::warn!("Calling `send_each` for subscriptions that don't use params is… bad?"),
Err(e) => log::error!("err: {:?}", e)
}
}
// Remove broken connections
for entry in errored {
subs.remove(&entry);
}
Ok(())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Params being Option<Params>
instead of just Params
is a separate issue, see the other comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's neat.
utils/src/server/rpc_module.rs
Outdated
let mut errored = Vec::new(); | ||
|
||
for ((conn_id, sub_id), sender) in subs.iter_mut() { | ||
let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let result = match sender.params.as_mut().map(|p| to_raw_value(&f(p))) { | |
let result = match sender.params.as_ref().map(f) { |
Closure return type is known, and it's ok if it's transient since it's going to be serialized in the same block, so there is no reason to use RawValue
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason was that this requires RawValue, it's sent a few lines below this.
But I could split it out to make it more clean/clear I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just make the struct generic, no? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's possible but I'd prefer to do it another PR :)
Co-authored-by: Maciej Hirsz <[email protected]>
…psee into na-subscription-design2
None => Err(CallError::InvalidParams), | ||
Some(params) => serde_json::from_str(params).map_err(|_| CallError::InvalidParams), | ||
} | ||
let params = self.0.unwrap_or("null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cover the case if the Params
is Option::None or () but maybe it's weird because technically not according to the spec but wasn't required before either.
type Subscribers = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), MethodSink>>>; | ||
/// Map of subscribers keyed by the connection and subscription ids to an [`InnerSink`] that contains the parameters | ||
/// they used to subscribe and the tx side of a channel used to convey results and errors back. | ||
type Subscribers<P> = Arc<Mutex<FxHashMap<(ConnectionId, SubscriptionId), InnerSink<P>>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Superseeded by #336 |
Currently, the params within a subscription request is kept inside the server thus not possible to access the params.
This PR changes the
SubscriptionSink<Param>
to take a type parameter that must correspond to the param in the subscription request otherwise the request will be denied (not send back a subscription ID)It adds two additional APIs:
call_with_params
: a way to apply a closure to the params and send out the response on that specific subscriptionto_sinks
: consumes the current subscriptions, to expose the inner subscription sinks.