Skip to content

Commit

Permalink
fix(rs): allow indexing partial paths
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Oct 1, 2024
1 parent 0357e83 commit 1471710
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 142 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,6 @@ wrpc-cli = { version = "0.3", path = "./crates/cli", default-features = false }
wrpc-introspect = { version = "0.4.1", default-features = false, path = "./crates/introspect" }
wrpc-runtime-wasmtime = { version = "0.23", path = "./crates/runtime-wasmtime", default-features = false }
wrpc-transport = { version = "0.27", path = "./crates/transport", default-features = false }
wrpc-transport-nats = { version = "0.24", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.2", path = "./crates/transport-quic", default-features = false }
wrpc-transport-nats = { version = "0.24.1", path = "./crates/transport-nats", default-features = false }
wrpc-transport-quic = { version = "0.2.1", path = "./crates/transport-quic", default-features = false }
wrpc-wasmtime-nats-cli = { version = "0.9", path = "./crates/wasmtime-nats-cli", default-features = false }
2 changes: 1 addition & 1 deletion crates/transport-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport-nats"
version = "0.24.0"
version = "0.24.1"
description = "wRPC NATS transport"

authors.workspace = true
Expand Down
18 changes: 11 additions & 7 deletions crates/transport-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl SubscriberTree {

pub struct Reader {
buffer: Bytes,
incoming: Subscriber,
incoming: Option<Subscriber>,
nested: Arc<std::sync::Mutex<SubscriberTree>>,
path: Box<[usize]>,
}
Expand All @@ -323,9 +323,7 @@ impl wrpc_transport::Index<Self> for Reader {
trace!("taking index subscription");
let mut p = self.path.to_vec();
p.extend_from_slice(path);
let incoming = nested
.take(&p)
.with_context(|| format!("unknown subscription for path `{p:?}`"))?;
let incoming = nested.take(&p);
Ok(Self {
buffer: Bytes::default(),
incoming,
Expand Down Expand Up @@ -358,8 +356,14 @@ impl AsyncRead for Reader {
}
return Poll::Ready(Ok(()));
}
let Some(incoming) = self.incoming.as_mut() else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("subscription not found for path {:?}", self.path),
)));
};
trace!("polling for next message");
match self.incoming.poll_next_unpin(cx) {
match incoming.poll_next_unpin(cx) {
Poll::Ready(Some(Message { mut payload, .. })) => {
trace!(?payload, "received message");
if payload.is_empty() {
Expand Down Expand Up @@ -961,7 +965,7 @@ impl wrpc_transport::Invoke for Client {
)),
Reader {
buffer: Bytes::default(),
incoming: result_rx,
incoming: Some(result_rx),
nested: Arc::new(std::sync::Mutex::new(nested)),
path: Box::default(),
},
Expand Down Expand Up @@ -1042,7 +1046,7 @@ impl wrpc_transport::Serve for Client {
SubjectWriter::new((*nats).clone(), Subject::from(result_subject(&tx))),
Reader {
buffer: payload,
incoming: param_rx,
incoming: Some(param_rx),
nested: Arc::new(std::sync::Mutex::new(nested)),
path: Box::default(),
},
Expand Down
2 changes: 1 addition & 1 deletion crates/transport-quic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wrpc-transport-quic"
version = "0.2.0"
version = "0.2.1"
description = "wRPC QUIC transport"

authors.workspace = true
Expand Down
35 changes: 18 additions & 17 deletions crates/transport-quic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTree {
}

impl IndexTree {
#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn take_rx(&mut self, path: &[usize]) -> Option<oneshot::Receiver<RecvStream>> {
let Some((i, path)) = path.split_first() else {
return match self {
Expand Down Expand Up @@ -168,7 +168,7 @@ impl IndexTree {
}
}

#[instrument(level = "trace", skip(self))]
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn take_tx(&mut self, path: &[usize]) -> Option<oneshot::Sender<RecvStream>> {
let Some((i, path)) = path.split_first() else {
return match self {
Expand Down Expand Up @@ -204,7 +204,7 @@ impl IndexTree {

/// Inserts `sender` and `receiver` under a `path` - returns `false` if it failed and `true` if it succeeded.
/// Tree state after `false` is returned is undefined
#[instrument(level = "trace", skip(self, sender, receiver), ret)]
#[instrument(level = "trace", skip(self, sender, receiver), ret(level = "trace"))]
fn insert(
&mut self,
path: &[Option<usize>],
Expand Down Expand Up @@ -366,7 +366,7 @@ pin_project! {
index: Arc<std::sync::Mutex<IndexTree>>,
path: Arc<[usize]>,
#[pin]
rx: oneshot::Receiver<RecvStream>,
rx: Option<oneshot::Receiver<RecvStream>>,
io: Arc<JoinSet<std::io::Result<()>>>,
},
Active {
Expand Down Expand Up @@ -406,12 +406,7 @@ impl wrpc_transport::Index<Self> for Incoming {
std::io::Error::new(std::io::ErrorKind::Other, err.to_string())
})?;
trace!(?path, "taking index subscription");
let rx = lock.take_rx(&path).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("`{path:?}` subscription not found"),
)
})?;
let rx = lock.take_rx(&path);
Ok(Self::Accepting {
index: Arc::clone(index),
path,
Expand All @@ -424,7 +419,7 @@ impl wrpc_transport::Index<Self> for Incoming {
}

impl AsyncRead for Incoming {
#[instrument(level = "trace", skip_all, ret)]
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -434,11 +429,17 @@ impl AsyncRead for Incoming {
IncomingProj::Accepting {
index,
path,
mut rx,
rx,
io,
} => {
let Some(rx) = rx.as_pin_mut() else {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("subscription not found for path {:?}", path),
)));
};
trace!(?path, "polling channel");
let rx = ready!(rx.as_mut().poll(cx))
let rx = ready!(rx.poll(cx))
.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
*self = Self::Active {
index: Arc::clone(index),
Expand Down Expand Up @@ -536,7 +537,7 @@ fn poll_write_header(
}

impl Outgoing {
#[instrument(level = "trace", skip_all, ret)]
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
fn poll_flush_header(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -571,7 +572,7 @@ fn corrupted_memory_error() -> std::io::Error {
}

impl AsyncWrite for Outgoing {
#[instrument(level = "trace", skip_all, ret, fields(buf = format!("{buf:02x?}")))]
#[instrument(level = "trace", skip_all, fields(buf = format!("{buf:02x?}")), ret(level = "trace"))]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -587,7 +588,7 @@ impl AsyncWrite for Outgoing {
}
}

#[instrument(level = "trace", skip_all, ret)]
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
ready!(self.as_mut().poll_flush_header(cx))?;
match self.as_mut().project() {
Expand All @@ -599,7 +600,7 @@ impl AsyncWrite for Outgoing {
}
}

#[instrument(level = "trace", skip_all, ret)]
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
ready!(self.as_mut().poll_flush_header(cx))?;
match self.as_mut().project() {
Expand Down
Loading

0 comments on commit 1471710

Please sign in to comment.