Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(go,rs): nested async support #346

Merged
merged 17 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ wasmtime = { version = "25", default-features = false }
wasmtime-wasi = { version = "25", default-features = false }
wit-bindgen = { version = "0.32", default-features = false }
wit-bindgen-core = { version = "0.32", default-features = false }
wit-bindgen-wrpc = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc = { version = "0.7", default-features = false, path = "./crates/wit-bindgen" }
wit-bindgen-wrpc-go = { version = "0.9", default-features = false, path = "./crates/wit-bindgen-go" }
wit-bindgen-wrpc-rust = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.6.5", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-bindgen-wrpc-rust = { version = "0.7", default-features = false, path = "./crates/wit-bindgen-rust" }
wit-bindgen-wrpc-rust-macro = { version = "0.7", default-features = false, path = "./crates/wit-bindgen-rust-macro" }
wit-component = { version = "0.217", default-features = false }
wit-parser = { version = "0.217", default-features = false }
wrpc-cli = { version = "0.3", path = "./crates/cli", default-features = false }
wrpc-introspect = { version = "0.3", default-features = false, path = "./crates/introspect" }
wrpc-introspect = { version = "0.4", default-features = false, path = "./crates/introspect" }
wrpc-runtime-wasmtime = { version = "0.22", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-transport = { version = "0.26.8", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.23.2", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.1.2", path = "./crates/transport-quic", default-features = false }
wrpc-transport = { version = "0.27", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.24", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.2", path = "./crates/transport-quic", default-features = false }
wrpc-wasmtime-nats-cli = { version = "0.8", path = "./crates/wasmtime-nats-cli", default-features = false }
2 changes: 1 addition & 1 deletion crates/introspect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-introspect"
version = "0.3.0"
version = "0.4.0"
description = "Component type introspection for wRPC"

authors.workspace = true
Expand Down
9 changes: 1 addition & 8 deletions crates/introspect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,7 @@ pub fn async_paths_tyid(resolve: &Resolve, id: TypeId) -> (BTreeSet<VecDeque<Opt
TypeDefKind::Future(ty) => {
let mut paths = BTreeSet::default();
if let Some(ty) = ty {
let (nested, fut) = async_paths_ty(resolve, ty);
for mut path in nested {
path.push_front(Some(0));
paths.insert(path);
}
if fut {
paths.insert(vec![Some(0)].into());
}
(paths, _) = async_paths_ty(resolve, ty);
}
(paths, true)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transport-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport-nats"
version = "0.23.2"
version = "0.24.0"
description = "wRPC NATS transport"

authors.workspace = true
Expand Down
27 changes: 24 additions & 3 deletions crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,24 +308,29 @@ pub struct Reader {
buffer: Bytes,
incoming: Subscriber,
nested: Arc<std::sync::Mutex<SubscriberTree>>,
path: Box<[usize]>,
}

impl wrpc_transport::Index<Self> for Reader {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
trace!("locking index tree");
let mut nested = self
.nested
.lock()
.map_err(|err| anyhow!(err.to_string()).context("failed to lock map"))?;
trace!("taking index subscription");
let mut p = self.path.to_vec();
p.extend_from_slice(path);
let incoming = nested
.take(path)
.with_context(|| format!("unknown subscription for path `{path:?}`"))?;
.take(&p)
.with_context(|| format!("unknown subscription for path `{p:?}`"))?;
Ok(Self {
buffer: Bytes::default(),
incoming,
nested: Arc::clone(&self.nested),
path: p.into_boxed_slice(),
})
}
}
Expand Down Expand Up @@ -400,6 +405,7 @@ impl SubjectWriter {
impl wrpc_transport::Index<Self> for SubjectWriter {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
let tx = Subject::from(index_path(self.tx.as_str(), path));
Ok(Self {
nats: self.nats.clone(),
Expand Down Expand Up @@ -624,6 +630,7 @@ impl RootParamWriter {
impl wrpc_transport::Index<IndexedParamWriter> for RootParamWriter {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<IndexedParamWriter> {
ensure!(!path.is_empty());
match self {
Self::Corrupted => Err(anyhow!(corrupted_memory_error())),
Self::Handshaking { indexed, .. } => {
Expand Down Expand Up @@ -741,6 +748,7 @@ impl IndexedParamWriter {
impl wrpc_transport::Index<Self> for IndexedParamWriter {
#[instrument(level = "trace", skip_all)]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
match self {
Self::Corrupted => Err(anyhow!(corrupted_memory_error())),
Self::Handshaking { indexed, .. } => {
Expand Down Expand Up @@ -814,6 +822,7 @@ pub enum ParamWriter {

impl wrpc_transport::Index<Self> for ParamWriter {
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
match self {
ParamWriter::Root(w) => w.index(path),
ParamWriter::Nested(w) => w.index(path),
Expand Down Expand Up @@ -871,20 +880,30 @@ impl wrpc_transport::Invoke for Client {
let paths = paths.as_ref();
let (result_rx, handshake_rx, nested) = try_join!(
async {
trace!(
subject = result_rx.as_str(),
"subscribing on result subject"
);
self.nats
.subscribe(result_rx.clone())
.await
.context("failed to subscribe on result subject")
},
async {
trace!(subject = rx.as_str(), "subscribing on handshake subject");
self.nats
.subscribe(rx.clone())
.await
.context("failed to subscribe on handshake subject")
},
try_join_all(paths.iter().map(|path| async {
let rx = Subject::from(subscribe_path(&result_rx, path.as_ref()));
trace!(
subject = rx.as_str(),
"subscribing on nested result subject"
);
self.nats
.subscribe(Subject::from(subscribe_path(&result_rx, path.as_ref())))
.subscribe(rx)
.await
.context("failed to subscribe on nested result subject")
}))
Expand Down Expand Up @@ -944,6 +963,7 @@ impl wrpc_transport::Invoke for Client {
buffer: Bytes::default(),
incoming: result_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
path: Box::default(),
},
))
}
Expand Down Expand Up @@ -1024,6 +1044,7 @@ impl wrpc_transport::Serve for Client {
buffer: payload,
incoming: param_rx,
nested: Arc::new(std::sync::Mutex::new(nested)),
path: Box::default(),
},
))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transport-quic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport-quic"
version = "0.1.2"
version = "0.2.0"
description = "wRPC QUIC transport"

authors.workspace = true
Expand Down
18 changes: 10 additions & 8 deletions crates/transport-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ pin_project! {
impl wrpc_transport::Index<Self> for Incoming {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
match self {
Self::Accepting {
index,
Expand All @@ -404,7 +405,7 @@ impl wrpc_transport::Index<Self> for Incoming {
let mut lock = index.lock().map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, err.to_string())
})?;
trace!("taking index subscription");
trace!(?path, "taking index subscription");
let rx = lock.take_rx(&path).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::NotFound,
Expand Down Expand Up @@ -450,7 +451,7 @@ impl AsyncRead for Incoming {
IncomingProj::Active { rx, path, .. } => {
trace!(?path, "reading buffer");
ready!(AsyncRead::poll_read(rx, cx, buf))?;
trace!(buf = ?buf.filled(), "read from buffer");
trace!(?path, buf = ?buf.filled(), "read from buffer");
Poll::Ready(Ok(()))
}
}
Expand Down Expand Up @@ -482,6 +483,7 @@ pin_project! {
impl wrpc_transport::Index<Self> for Outgoing {
#[instrument(level = "trace", skip(self))]
fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
ensure!(!path.is_empty());
let mut header = BytesMut::with_capacity(path.len().saturating_add(5));
let depth = path.len();
let n = u32::try_from(depth)
Expand Down Expand Up @@ -578,8 +580,8 @@ impl AsyncWrite for Outgoing {
ready!(self.as_mut().poll_flush_header(cx))?;
match self.as_mut().project() {
OutgoingProj::Opening { .. } => Poll::Ready(Err(corrupted_memory_error())),
OutgoingProj::Active { tx, .. } => {
trace!("writing buffer");
OutgoingProj::Active { tx, path, .. } => {
trace!(?path, ?buf, "writing buffer");
AsyncWrite::poll_write(tx, cx, buf)
}
}
Expand All @@ -590,8 +592,8 @@ impl AsyncWrite for Outgoing {
ready!(self.as_mut().poll_flush_header(cx))?;
match self.as_mut().project() {
OutgoingProj::Opening { .. } => Poll::Ready(Err(corrupted_memory_error())),
OutgoingProj::Active { tx, .. } => {
trace!("flushing stream");
OutgoingProj::Active { tx, path, .. } => {
trace!(?path, "flushing stream");
tx.poll_flush(cx)
}
}
Expand All @@ -602,8 +604,8 @@ impl AsyncWrite for Outgoing {
ready!(self.as_mut().poll_flush_header(cx))?;
match self.as_mut().project() {
OutgoingProj::Opening { .. } => Poll::Ready(Err(corrupted_memory_error())),
OutgoingProj::Active { tx, .. } => {
trace!("shutting down stream");
OutgoingProj::Active { tx, path, .. } => {
trace!(?path, "shutting down stream");
tx.poll_shutdown(cx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport"
version = "0.26.8"
version = "0.27.0"
description = "wRPC core transport functionality"

authors.workspace = true
Expand Down
14 changes: 10 additions & 4 deletions crates/transport/src/invoke.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::future::Future;
use core::mem;
use core::pin::pin;
use core::time::Duration;

Expand All @@ -10,7 +11,7 @@ use tokio::{select, try_join};
use tokio_util::codec::{Encoder as _, FramedRead};
use tracing::{debug, instrument, trace, Instrument as _};

use crate::{Deferred as _, Index, TupleDecode, TupleEncode};
use crate::{Deferred as _, Incoming, Index, TupleDecode, TupleEncode};

/// Client-side handle to a wRPC transport
pub trait Invoke: Send + Sync {
Expand Down Expand Up @@ -181,7 +182,7 @@ pub trait InvokeExt: Invoke {
tokio::spawn(
async {
debug!("transmitting async parameters");
tx(outgoing.into(), Vec::with_capacity(8))
tx(outgoing, Vec::default())
.await
.context("failed to write async parameters")
}
Expand Down Expand Up @@ -213,7 +214,12 @@ pub trait InvokeExt: Invoke {
results.await?
};
trace!("received sync results");
let buffer = mem::take(dec.read_buffer_mut());
let rx = dec.decoder_mut().take_deferred();
let incoming = Incoming {
buffer,
inner: dec.into_inner(),
};
Ok((
results,
(tx.is_some() || rx.is_some()).then_some(
Expand All @@ -223,7 +229,7 @@ pub trait InvokeExt: Invoke {
try_join!(
async {
debug!("receiving async results");
rx(dec.into_inner().into(), Vec::with_capacity(8))
rx(incoming, Vec::default())
.await
.context("receiving async results failed")
},
Expand All @@ -237,7 +243,7 @@ pub trait InvokeExt: Invoke {
}
(None, Some(rx)) => {
debug!("receiving async results");
rx(dec.into_inner().into(), Vec::with_capacity(8))
rx(incoming, Vec::default())
.await
.context("receiving async results failed")?;
}
Expand Down
Loading
Loading