From 6e725faaef66c04c93688c8dd664a2a628b0f2bc Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Fri, 11 Oct 2024 19:22:22 +0200 Subject: [PATCH] doc: add `wrpc-transport` docs Signed-off-by: Roman Volosatovs --- SPEC.md | 2 +- crates/transport/src/frame/codec.rs | 3 ++ crates/transport/src/frame/conn/accept.rs | 5 +++ crates/transport/src/frame/conn/client.rs | 1 + crates/transport/src/frame/conn/mod.rs | 7 +++- crates/transport/src/frame/conn/server.rs | 13 +++++++- crates/transport/src/frame/mod.rs | 6 ++++ crates/transport/src/frame/tcp.rs | 7 ++++ crates/transport/src/frame/unix.rs | 7 ++++ crates/transport/src/invoke.rs | 9 +++++ crates/transport/src/lib.rs | 27 ++++++++++++--- crates/transport/src/serve.rs | 3 ++ crates/transport/src/value.rs | 40 ++++++++++++++++++++++- 13 files changed, 122 insertions(+), 8 deletions(-) diff --git a/SPEC.md b/SPEC.md index 80a3afd7..9a49aea1 100644 --- a/SPEC.md +++ b/SPEC.md @@ -2,7 +2,7 @@ wRPC is a transport-agnostic protocol designed for asynchronous transmit of WIT function call invocations over network and other means of communication. -wRPC follows client-server model, where peers may *serve* function (servers) and method calls invoked by the other peers (clients). +wRPC follows client-server model, where peers (servers) may *serve* function and method calls invoked by the other peers (clients). wRPC relies on [component model value definition encoding] for data encoding on the wire. diff --git a/crates/transport/src/frame/codec.rs b/crates/transport/src/frame/codec.rs index 578baa09..19de2049 100644 --- a/crates/transport/src/frame/codec.rs +++ b/crates/transport/src/frame/codec.rs @@ -6,6 +6,7 @@ use wasm_tokio::{Leb128DecoderU32, Leb128DecoderU64, Leb128Encoder}; use super::{Frame, FrameRef}; +/// [Frame] decoder pub struct Decoder { path: Option>, path_cap: usize, @@ -15,6 +16,7 @@ pub struct Decoder { } impl Decoder { + /// Construct a new [Frame] decoder #[must_use] pub fn new(max_depth: u32, max_size: u64) -> Self { Self { @@ -125,6 +127,7 @@ impl tokio_util::codec::Decoder for Decoder { } } +/// [Frame] encoder pub struct Encoder; impl tokio_util::codec::Encoder> for Encoder { diff --git a/crates/transport/src/frame/conn/accept.rs b/crates/transport/src/frame/conn/accept.rs index 81cbb9ef..117fc0e5 100644 --- a/crates/transport/src/frame/conn/accept.rs +++ b/crates/transport/src/frame/conn/accept.rs @@ -3,6 +3,7 @@ use core::ops::{Deref, DerefMut}; use tokio::io::{AsyncRead, AsyncWrite}; +/// Accepts connections on a transport pub trait Accept { /// Transport-specific invocation context type Context: Send + Sync + 'static; @@ -13,11 +14,13 @@ pub trait Accept { /// Incoming byte stream type Incoming: AsyncRead + Send + Sync + Unpin + 'static; + /// Accept a connection returning a pair of streams and connection context fn accept( &self, ) -> impl Future>; } +/// Wrapper returned by [`AcceptExt::map_context`] pub struct AcceptMapContext { inner: T, f: F, @@ -37,7 +40,9 @@ impl DerefMut for AcceptMapContext { } } +/// Extension trait for [Accept] pub trait AcceptExt: Accept + Sized { + /// Maps [`Self::Context`](Accept::Context) to a type `T` using `F` fn map_context T>(self, f: F) -> AcceptMapContext { AcceptMapContext { inner: self, f } } diff --git a/crates/transport/src/frame/conn/client.rs b/crates/transport/src/frame/conn/client.rs index ebb09614..c0401e5e 100644 --- a/crates/transport/src/frame/conn/client.rs +++ b/crates/transport/src/frame/conn/client.rs @@ -15,6 +15,7 @@ use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes}; use crate::frame::conn::{egress, ingress, Incoming, Outgoing}; use crate::frame::PROTOCOL; +/// Invoke function `func` on instance `instance` #[instrument(level = "trace", skip_all)] pub async fn invoke( mut tx: O, diff --git a/crates/transport/src/frame/conn/mod.rs b/crates/transport/src/frame/conn/mod.rs index f715896e..d3b70c8a 100644 --- a/crates/transport/src/frame/conn/mod.rs +++ b/crates/transport/src/frame/conn/mod.rs @@ -28,8 +28,9 @@ pub use accept::*; pub use client::*; pub use server::*; +/// Index trie containing async stream subscriptions #[derive(Default)] -pub enum IndexTrie { +enum IndexTrie { #[default] Empty, Leaf { @@ -123,6 +124,7 @@ impl]>> FromIterator

for IndexTrie { } impl IndexTrie { + /// Takes the receiver #[instrument(level = "trace", skip(self), ret(level = "trace"))] fn take_rx(&mut self, path: &[usize]) -> Option>> { let Some((i, path)) = path.split_first() else { @@ -157,6 +159,7 @@ impl IndexTrie { } } + /// Gets a sender #[instrument(level = "trace", skip(self), ret(level = "trace"))] fn get_tx(&mut self, path: &[usize]) -> Option>> { let Some((i, path)) = path.split_first() else { @@ -269,6 +272,7 @@ impl IndexTrie { } pin_project! { + /// Incoming framed stream #[project = IncomingProj] pub struct Incoming { #[pin] @@ -332,6 +336,7 @@ impl AsyncRead for Incoming { } pin_project! { + /// Outgoing framed stream #[project = OutgoingProj] pub struct Outgoing { #[pin] diff --git a/crates/transport/src/frame/conn/server.rs b/crates/transport/src/frame/conn/server.rs index e720f683..337cf954 100644 --- a/crates/transport/src/frame/conn/server.rs +++ b/crates/transport/src/frame/conn/server.rs @@ -19,6 +19,7 @@ use crate::frame::conn::{egress, ingress, Accept}; use crate::frame::{Incoming, Outgoing}; use crate::Serve; +/// wRPC server for framed transports pub struct Server(Mutex>>>); impl Default for Server { @@ -27,10 +28,20 @@ impl Default for Server { } } +/// Error returned by [`Server::accept`] pub enum AcceptError { + /// I/O error IO(std::io::Error), + /// Protocol version is not supported UnsupportedVersion(u8), - UnhandledFunction { instance: String, name: String }, + /// Function was not handled + UnhandledFunction { + /// Instance + instance: String, + /// Function name + name: String, + }, + /// Message sending failed Send(mpsc::error::SendError<(C, I, O)>), } diff --git a/crates/transport/src/frame/mod.rs b/crates/transport/src/frame/mod.rs index e52017fa..73b204f5 100644 --- a/crates/transport/src/frame/mod.rs +++ b/crates/transport/src/frame/mod.rs @@ -1,3 +1,5 @@ +//! wRPC transport stream framing + use std::sync::Arc; use bytes::Bytes; @@ -19,14 +21,18 @@ pub const PROTOCOL: u8 = 0; /// Owned wRPC frame #[derive(Clone, Debug, Eq, PartialEq)] pub struct Frame { + /// Frame path pub path: Arc<[usize]>, + /// Frame data pub data: Bytes, } /// wRPC frame reference #[derive(Clone, Debug, Eq, PartialEq)] pub struct FrameRef<'a> { + /// Frame path pub path: &'a [usize], + /// Frame data pub data: &'a [u8], } diff --git a/crates/transport/src/frame/tcp.rs b/crates/transport/src/frame/tcp.rs index 68d7aecc..fed79f94 100644 --- a/crates/transport/src/frame/tcp.rs +++ b/crates/transport/src/frame/tcp.rs @@ -1,3 +1,5 @@ +//! TCP transport + use core::net::SocketAddr; use anyhow::{bail, Context as _}; @@ -9,8 +11,13 @@ use tracing::instrument; use crate::frame::{invoke, Accept, Incoming, Outgoing}; use crate::Invoke; +/// [Invoke] implementation in terms of a single [TcpStream] +/// +/// [`Invoke::invoke`] can only be called once on [Invocation], +/// repeated calls with return an error pub struct Invocation(std::sync::Mutex>); +/// [Invoke] implementation of a TCP transport #[derive(Clone, Debug)] pub struct Client(T); diff --git a/crates/transport/src/frame/unix.rs b/crates/transport/src/frame/unix.rs index 01354da7..6fd744ee 100644 --- a/crates/transport/src/frame/unix.rs +++ b/crates/transport/src/frame/unix.rs @@ -1,3 +1,5 @@ +//! Unix domain socket transport + use std::path::{Path, PathBuf}; use anyhow::{bail, Context as _}; @@ -9,8 +11,13 @@ use tracing::instrument; use crate::frame::{invoke, Accept, Incoming, Outgoing}; use crate::Invoke; +/// [Invoke] implementation in terms of a single [UnixStream] +/// +/// [`Invoke::invoke`] can only be called once on [Invocation], +/// repeated calls with return an error pub struct Invocation(std::sync::Mutex>); +/// [Invoke] implementation of a Unix domain socket transport #[derive(Clone, Debug)] pub struct Client(T); diff --git a/crates/transport/src/invoke.rs b/crates/transport/src/invoke.rs index 77bf54cf..c177fc20 100644 --- a/crates/transport/src/invoke.rs +++ b/crates/transport/src/invoke.rs @@ -1,3 +1,5 @@ +//! wRPC transport client handle + use core::future::Future; use core::mem; use core::pin::pin; @@ -76,9 +78,12 @@ pub trait Invoke: Send + Sync { P: AsRef<[Option]> + Send + Sync; } +/// Wrapper struct returned by [InvokeExt::timeout] #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct Timeout<'a, T: ?Sized> { + /// Inner [Invoke] pub inner: &'a T, + /// Invocation timeout pub timeout: Duration, } @@ -108,9 +113,12 @@ impl Invoke for Timeout<'_, T> { } } +/// Wrapper struct returned by [InvokeExt::timeout_owned] #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct TimeoutOwned { + /// Inner [Invoke] pub inner: T, + /// Invocation timeout pub timeout: Duration, } @@ -138,6 +146,7 @@ impl Invoke for TimeoutOwned { } } +/// Extension trait for [Invoke] pub trait InvokeExt: Invoke { /// Invoke function `func` on instance `instance` using typed `Params` and `Results` #[instrument(level = "trace", skip(self, cx, params, paths))] diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index bc685322..b0bdfd6c 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -1,4 +1,18 @@ #![allow(clippy::type_complexity)] +#![deny(missing_docs)] + +//! wRPC transport abstractions, codec and framing +//! +//! wRPC is an RPC framework based on [WIT](https://component-model.bytecodealliance.org/design/wit.html). +//! It follows client-server model, where peers (servers) may serve function and method calls invoked by the other peers (clients). +//! +//! The two main abstractions on top of which wRPC is built are: +//! - [Invoke] - the client-side handle to a wRPC transport, allowing clients to *invoke* WIT functions over wRPC transport +//! - [Serve] - the server-side handle to a wRPC transport, allowing servers to *serve* WIT functions over wRPC transport +//! +//! Implementations of [Invoke] and [Serve] define transport-specific, multiplexed bidirectional byte stream types: +//! - [`Invoke::Incoming`] and [`Serve::Incoming`] represent the stream *incoming* from a peer. +//! - [`Invoke::Outgoing`] and [`Serve::Outgoing`] represent the stream *outgoing* to a peer. pub mod frame; pub mod invoke; @@ -27,20 +41,25 @@ use bytes::BytesMut; use tokio::io::{AsyncRead, ReadBuf}; use tracing::trace; +/// Internal workaround trait +/// +/// This is an internal trait used as a workaround for +/// https://github.com/rust-lang/rust/issues/63033 #[doc(hidden)] -// This is an internal trait used as a workaround for -// https://github.com/rust-lang/rust/issues/63033 pub trait Captures<'a> {} impl<'a, T: ?Sized> Captures<'a> for T {} -/// `Index` implementations are capable of multiplexing underlying connections using a particular -/// structural `path` +/// Multiplexes streams +/// +/// Implementations of this trait define multiplexing for underlying connections +/// using a particular structural `path` pub trait Index { /// Index the entity using a structural `path` fn index(&self, path: &[usize]) -> anyhow::Result; } +/// Buffered incoming stream used for decoding values pub struct Incoming { buffer: BytesMut, inner: T, diff --git a/crates/transport/src/serve.rs b/crates/transport/src/serve.rs index 298f277b..1e698dab 100644 --- a/crates/transport/src/serve.rs +++ b/crates/transport/src/serve.rs @@ -1,3 +1,5 @@ +//! wRPC transport server handle + use core::future::Future; use core::mem; use core::pin::Pin; @@ -38,6 +40,7 @@ pub trait Serve: Sync { > + Send; } +/// Extension trait for [Serve] pub trait ServeExt: Serve { /// Serve function `func` from instance `instance` using typed `Params` and `Results` #[instrument(level = "trace", skip(self, paths))] diff --git a/crates/transport/src/value.rs b/crates/transport/src/value.rs index 0d1aefa3..b6a87dd4 100644 --- a/crates/transport/src/value.rs +++ b/crates/transport/src/value.rs @@ -33,6 +33,7 @@ use wasm_tokio::{ use crate::{Incoming, Index as _}; +/// Borrowed resource handle, represented as an opaque byte blob #[repr(transparent)] pub struct ResourceBorrow { repr: Bytes, @@ -96,11 +97,13 @@ impl Debug for ResourceBorrow { } impl ResourceBorrow { + /// Constructs a new borrowed resource handle pub fn new(repr: impl Into) -> Self { Self::from(repr.into()) } } +/// Owned resource handle, represented as an opaque byte blob #[repr(transparent)] pub struct ResourceOwn { repr: Bytes, @@ -173,10 +176,12 @@ impl Debug for ResourceOwn { } impl ResourceOwn { + /// Constructs a new owned resource handle pub fn new(repr: impl Into) -> Self { Self::from(repr.into()) } + /// Returns the owned handle as [ResourceBorrow] pub fn as_borrow(&self) -> ResourceBorrow { ResourceBorrow { repr: self.repr.clone(), @@ -185,11 +190,14 @@ impl ResourceOwn { } } +/// Deferred operation used for async value processing pub type DeferredFn = Box< dyn FnOnce(T, Vec) -> Pin> + Send>> + Send, >; +/// Handles async processing state for codecs pub trait Deferred { + /// Takes a deferred async processing operation, if any fn take_deferred(&mut self) -> Option>; } @@ -261,6 +269,10 @@ impl_deferred_sync!(CoreVecDecoder); impl_deferred_sync!(CoreVecDecoder); impl_deferred_sync!(CoreVecDecoder); +/// Codec for synchronous values +/// +/// This is a wrapper struct, which provides a no-op [Deferred] implementation +/// for any codec. pub struct SyncCodec(pub T); impl Deref for SyncCodec { @@ -327,7 +339,7 @@ where } #[instrument(level = "trace", skip(w, deferred))] -pub async fn handle_deferred(w: T, deferred: I, mut path: Vec) -> std::io::Result<()> +async fn handle_deferred(w: T, deferred: I, mut path: Vec) -> std::io::Result<()> where I: IntoIterator>>, I::IntoIter: ExactSizeIterator, @@ -346,9 +358,12 @@ where Ok(()) } +/// Defines value encoding pub trait Encode: Sized { + /// Encoder used to encode the value type Encoder: tokio_util::codec::Encoder + Deferred + Default + Send; + /// Convenience function for encoding a value #[instrument(level = "trace", skip(self, enc))] fn encode( self, @@ -360,6 +375,7 @@ pub trait Encode: Sized { Ok(enc.take_deferred()) } + /// Encode an iterator of owned values #[instrument(level = "trace", skip(items, enc))] fn encode_iter_own( items: I, @@ -387,6 +403,7 @@ pub trait Encode: Sized { } } + /// Encode an iterator of value references #[instrument(level = "trace", skip(items, enc))] fn encode_iter_ref<'a, I>( items: I, @@ -415,6 +432,7 @@ pub trait Encode: Sized { } } + /// Encode a list of owned values #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))] fn encode_list_own( items: Vec, @@ -431,6 +449,7 @@ pub trait Encode: Sized { Self::encode_iter_own(items, enc, dst) } + /// Encode a list of value references #[instrument(level = "trace", skip(items, enc), fields(ty = "list"))] fn encode_list_ref<'a>( items: &'a [Self], @@ -449,12 +468,15 @@ pub trait Encode: Sized { } } +/// Defines value decoding pub trait Decode: Sized { + /// Decoder used to decode value type Decoder: tokio_util::codec::Decoder + Deferred> + Default + Send + 'static; + /// Decoder used to decode lists of value type ListDecoder: tokio_util::codec::Decoder> + Default + 'static; } @@ -575,6 +597,7 @@ where type ListDecoder = ListDecoder; } +/// Encoder for `list` pub struct ListEncoder { deferred: Option>, } @@ -691,6 +714,7 @@ where type Encoder = ListEncoder; } +/// Decoder for `list` pub struct ListDecoder where T: tokio_util::codec::Decoder, @@ -705,6 +729,7 @@ impl ListDecoder where T: tokio_util::codec::Decoder, { + /// Constructs a new list decoder pub fn new(dec: T) -> Self { Self { dec, @@ -997,6 +1022,7 @@ impl<'b, T> Encode for &'b u8 { } } +/// Decoder for `list` #[derive(Debug, Default)] #[repr(transparent)] pub struct ListDecoderU8(CoreVecDecoderBytes); @@ -1053,6 +1079,7 @@ impl Decode for Bytes { type ListDecoder = CoreVecDecoder; } +/// Encoder for `resource` types #[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] #[repr(transparent)] pub struct ResourceEncoder; @@ -1109,6 +1136,7 @@ impl Encode for &ResourceBorrow { type Encoder = ResourceEncoder; } +/// Decoder for borrowed resource types #[derive(Debug)] #[repr(transparent)] pub struct ResourceBorrowDecoder { @@ -1153,6 +1181,7 @@ impl tokio_util::codec::Decoder for ResourceBorrowDecoder { } } +/// Decoder for owned resource types #[derive(Debug)] #[repr(transparent)] pub struct ResourceOwnDecoder { @@ -1197,6 +1226,7 @@ impl tokio_util::codec::Decoder for ResourceOwnDecoder { } } +/// Codec for `()` #[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] #[repr(transparent)] pub struct UnitCodec; @@ -1455,6 +1485,7 @@ impl_tuple_codec!( C0, C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15 ); +/// Encoder for `future` pub struct FutureEncoder { deferred: Option>, } @@ -1519,6 +1550,7 @@ where type Encoder = FutureEncoder; } +/// Decoder for `future` pub struct FutureDecoder where T: Decode, @@ -1630,6 +1662,7 @@ where type ListDecoder = ListDecoder; } +/// Encoder for `stream` pub struct StreamEncoder { deferred: Option>, } @@ -1737,6 +1770,7 @@ where type Encoder = StreamEncoder; } +/// Encoder for `stream>` pub struct StreamEncoderBytes { deferred: Option>, } @@ -1806,6 +1840,7 @@ where type Encoder = StreamEncoderBytes; } +/// Encoder for `stream>` with [AsyncRead] support pub struct StreamEncoderRead { deferred: Option>, } @@ -1933,6 +1968,7 @@ where type Encoder = StreamEncoderRead; } +/// Decoder for `stream` pub struct StreamDecoder where T: Decode, @@ -2066,6 +2102,7 @@ where type ListDecoder = ListDecoder; } +/// Decoder for `stream>` pub struct StreamDecoderBytes { dec: CoreVecDecoderBytes, deferred: Option>>, @@ -2145,6 +2182,7 @@ where type ListDecoder = ListDecoder; } +/// Decoder for `stream>` with [AsyncRead] support pub struct StreamDecoderRead { dec: CoreVecDecoderBytes, deferred: Option>>,