Skip to content

Commit

Permalink
fix(transport): rework trait bounds
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jun 21, 2024
1 parent b830439 commit a1ea004
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 123 deletions.
2 changes: 1 addition & 1 deletion crates/runtime-wasmtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ pub fn polyfill<'a, T, C, V>(
let Invocation { outgoing, incoming, session } = store
.data()
.client()
.invoke(cx, &instance_name, &rpc_name, buf.freeze(), &[])
.invoke(cx, &instance_name, &rpc_name, buf.freeze(), &[[]; 0])
.await
.with_context(|| {
format!("failed to invoke `{instance_name}.{func_name}` polyfill via wRPC")
Expand Down
145 changes: 73 additions & 72 deletions crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(clippy::type_complexity)]

use core::borrow::Borrow;
use core::future::Future;
use core::iter::zip;
use core::pin::{pin, Pin};
Expand Down Expand Up @@ -171,11 +170,11 @@ impl<'a> From<(&'a [Option<usize>], Subscriber)> for SubscriberTree {
}
}

impl<'a, P: Borrow<&'a [Option<usize>]>> FromIterator<(P, Subscriber)> for SubscriberTree {
impl<P: AsRef<[Option<usize>]>> FromIterator<(P, Subscriber)> for SubscriberTree {
fn from_iter<T: IntoIterator<Item = (P, Subscriber)>>(iter: T) -> Self {
let mut root = Self::Empty;
for (path, sub) in iter {
if !root.insert(path.borrow(), sub) {
if !root.insert(path.as_ref(), sub) {
return Self::Empty;
}
}
Expand Down Expand Up @@ -915,14 +914,14 @@ impl wrpc_transport::Invoke for Client {
type Outgoing = ParamWriter;
type Incoming = Reader;

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace", skip(self, paths))]
async fn invoke(
&self,
cx: Self::Context,
instance: &str,
func: &str,
mut params: Bytes,
paths: &[&[Option<usize>]],
paths: &[impl AsRef<[Option<usize>]> + Send + Sync],
) -> anyhow::Result<wrpc_transport::Invocation<Self::Outgoing, Self::Incoming, Self::Session>>
{
let rx = Subject::from(self.nats.new_inbox());
Expand All @@ -947,7 +946,7 @@ impl wrpc_transport::Invoke for Client {
},
futures::future::try_join_all(paths.iter().map(|path| async {
self.nats
.subscribe(Subject::from(subscribe_path(&rx, path)))
.subscribe(Subject::from(subscribe_path(&rx, path.as_ref())))
.await
.context("failed to subscribe on nested result subject")
}))
Expand Down Expand Up @@ -1028,87 +1027,89 @@ impl wrpc_transport::Serve for Client {
type Outgoing = SubjectWriter;
type Incoming = Reader;

#[instrument(level = "trace", skip(self))]
async fn serve(
#[instrument(level = "trace", skip(self, paths))]
async fn serve<P: AsRef<[Option<usize>]> + Send + Sync + 'static>(
&self,
instance: &str,
func: &str,
paths: &[&[Option<usize>]],
paths: impl Into<Arc<[P]>> + Send + Sync + 'static,
) -> anyhow::Result<
impl Stream<
Item = anyhow::Result<(
Self::Context,
wrpc_transport::Invocation<Self::Outgoing, Self::Incoming, Self::Session>,
)>,
>,
Item = anyhow::Result<(
Self::Context,
wrpc_transport::Invocation<Self::Outgoing, Self::Incoming, Self::Session>,
)>,
> + 'static,
> {
let sub = self
.nats
.subscribe(invocation_subject(&self.prefix, instance, func))
.await?;
let paths = paths.into();
let nats = Arc::clone(&self.nats);
Ok(sub.then(
|Message {
reply: tx,
payload,
headers,
..
}| async {
let tx = tx.context("peer did not specify a reply subject")?;
let rx = self.nats.new_inbox();
let (param_rx, error_rx, nested) = try_join!(
async {
self.nats
.subscribe(Subject::from(param_subject(&rx)))
.await
.context("failed to subscribe on parameter subject")
},
async {
self.nats
.subscribe(Subject::from(error_subject(&rx)))
.await
.context("failed to subscribe on error subject")
},
futures::future::try_join_all(paths.iter().map(|path| async {
self.nats
.subscribe(Subject::from(subscribe_path(&rx, path)))
.await
.context("failed to subscribe on nested parameter subject")
}))
)?;
let nested: SubscriberTree = zip(paths.iter(), nested).collect();
ensure!(
paths.is_empty() == nested.is_empty(),
"failed to construct subscription tree"
);
self.nats
.publish_with_reply(tx.clone(), rx, Bytes::default())
.await
.context("failed to publish handshake accept")?;
let result_tx = Subject::from(result_subject(&tx));
let error_tx = Subject::from(error_subject(&tx));
Ok((
headers,
wrpc_transport::Invocation {
outgoing: SubjectWriter::new(
Arc::clone(&self.nats),
result_tx.clone(),
self.nats.publish_sink(result_tx),
),
incoming: Reader {
buffer: payload,
incoming: param_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
move |Message {
reply: tx,
payload,
headers,
..
}| {
let nats = Arc::clone(&nats);
let paths = Arc::clone(&paths);
async move {
let tx = tx.context("peer did not specify a reply subject")?;
let rx = nats.new_inbox();
let (param_rx, error_rx, nested) = try_join!(
async {
nats.subscribe(Subject::from(param_subject(&rx)))
.await
.context("failed to subscribe on parameter subject")
},
session: Session {
async {
nats.subscribe(Subject::from(error_subject(&rx)))
.await
.context("failed to subscribe on error subject")
},
futures::future::try_join_all(paths.iter().map(|path| async {
nats.subscribe(Subject::from(subscribe_path(&rx, path.as_ref())))
.await
.context("failed to subscribe on nested parameter subject")
}))
)?;
let nested: SubscriberTree = zip(paths.iter(), nested).collect();
ensure!(
paths.is_empty() == nested.is_empty(),
"failed to construct subscription tree"
);
nats.publish_with_reply(tx.clone(), rx, Bytes::default())
.await
.context("failed to publish handshake accept")?;
let result_tx = Subject::from(result_subject(&tx));
let error_tx = Subject::from(error_subject(&tx));
Ok((
headers,
wrpc_transport::Invocation {
outgoing: SubjectWriter::new(
Arc::clone(&self.nats),
error_tx.clone(),
self.nats.publish_sink(error_tx),
Arc::clone(&nats),
result_tx.clone(),
nats.publish_sink(result_tx),
),
incoming: error_rx,
incoming: Reader {
buffer: payload,
incoming: param_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
},
session: Session {
outgoing: SubjectWriter::new(
Arc::clone(&nats),
error_tx.clone(),
nats.publish_sink(error_tx),
),
incoming: error_rx,
},
},
},
))
))
}
},
))
}
Expand Down
24 changes: 13 additions & 11 deletions crates/transport-quic/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::borrow::Borrow;
use core::fmt::Display;
use core::future::Future as _;
use core::net::SocketAddr;
Expand Down Expand Up @@ -116,12 +115,12 @@ impl<'a> From<(&'a [Option<usize>], oneshot::Sender<RecvStream>)> for IndexTree
}
}

impl<'a, P: Borrow<&'a [Option<usize>]>> FromIterator<P> for IndexTree {
impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTree {
fn from_iter<T: IntoIterator<Item = P>>(iter: T) -> Self {
let mut root = Self::Empty;
for path in iter {
let (tx, rx) = oneshot::channel();
if !root.insert(path.borrow(), Some(tx), Some(rx)) {
if !root.insert(path.as_ref(), Some(tx), Some(rx)) {
return Self::Empty;
}
}
Expand Down Expand Up @@ -732,14 +731,14 @@ impl wrpc_transport::Invoke for Client {
type Outgoing = Outgoing;
type Incoming = Incoming;

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace", skip(self, paths))]
async fn invoke(
&self,
cx: Self::Context,
instance: &str,
func: &str,
params: Bytes,
paths: &[&[Option<usize>]],
paths: &[impl AsRef<[Option<usize>]> + Send + Sync],
) -> anyhow::Result<Invocation> {
let san = san(instance, func);
trace!(?san, "establishing connection");
Expand Down Expand Up @@ -812,7 +811,7 @@ impl wrpc_transport::Invoke for Client {
#[instrument(level = "trace", skip_all)]
async fn serve_connection(
conn: Connection,
paths: &[&[Option<usize>]],
paths: &[impl AsRef<[Option<usize>]>],
) -> anyhow::Result<Invocation> {
let ((ret_tx, param_rx), res_rx, res_tx) = try_join!(
async {
Expand Down Expand Up @@ -883,13 +882,14 @@ impl wrpc_transport::Serve for Server {
type Outgoing = Outgoing;
type Incoming = Incoming;

#[instrument(level = "trace", skip(self))]
async fn serve(
#[instrument(level = "trace", skip(self, paths))]
async fn serve<P: AsRef<[Option<usize>]> + Send + Sync + 'static>(
&self,
instance: &str,
func: &str,
paths: &[&[Option<usize>]],
) -> anyhow::Result<impl Stream<Item = anyhow::Result<(Self::Context, Invocation)>>> {
paths: impl Into<Arc<[P]>> + Send + Sync + 'static,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<(Self::Context, Invocation)>> + 'static>
{
let san = san(instance, func);
let (tx, rx) = mpsc::channel(1024);
let mut handlers = self.0.lock().await;
Expand All @@ -902,9 +902,11 @@ impl wrpc_transport::Serve for Server {
}
}
let span = tracing::Span::current();
let paths = paths.into();
Ok(ReceiverStream::new(rx).then(move |conn| {
let paths = Arc::clone(&paths);
async move {
let invocation = serve_connection(conn, paths).await?;
let invocation = serve_connection(conn, &paths).await?;
Ok(((), invocation))
}
.instrument(span.clone())
Expand Down
2 changes: 1 addition & 1 deletion crates/transport-quic/tests/loopback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn loopback() -> anyhow::Result<()> {
let clt = Client::new(clt_ep, (Ipv4Addr::LOCALHOST, srv_addr.port()));
let srv = Server::default();
let invocations = srv
.serve("foo", "bar", &[&[Some(42), Some(0)]])
.serve("foo", "bar", [[Some(42), Some(0)]])
.await
.context("failed to serve `foo.bar`")?;
let mut invocations = pin!(invocations);
Expand Down
Loading

0 comments on commit a1ea004

Please sign in to comment.