Skip to content

Commit

Permalink
doc: add wrpc-transport docs
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Oct 11, 2024
1 parent cca4dd3 commit 6e725fa
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 8 deletions.
2 changes: 1 addition & 1 deletion SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

wRPC is a transport-agnostic protocol designed for asynchronous transmit of WIT function call invocations over network and other means of communication.

wRPC follows client-server model, where peers may *serve* function (servers) and method calls invoked by the other peers (clients).
wRPC follows client-server model, where peers (servers) may *serve* function and method calls invoked by the other peers (clients).

wRPC relies on [component model value definition encoding] for data encoding on the wire.

Expand Down
3 changes: 3 additions & 0 deletions crates/transport/src/frame/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use wasm_tokio::{Leb128DecoderU32, Leb128DecoderU64, Leb128Encoder};

use super::{Frame, FrameRef};

/// [Frame] decoder
pub struct Decoder {
path: Option<Vec<usize>>,
path_cap: usize,
Expand All @@ -15,6 +16,7 @@ pub struct Decoder {
}

impl Decoder {
/// Construct a new [Frame] decoder
#[must_use]
pub fn new(max_depth: u32, max_size: u64) -> Self {
Self {
Expand Down Expand Up @@ -125,6 +127,7 @@ impl tokio_util::codec::Decoder for Decoder {
}
}

/// [Frame] encoder
pub struct Encoder;

impl tokio_util::codec::Encoder<FrameRef<'_>> for Encoder {
Expand Down
5 changes: 5 additions & 0 deletions crates/transport/src/frame/conn/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::ops::{Deref, DerefMut};

use tokio::io::{AsyncRead, AsyncWrite};

/// Accepts connections on a transport
pub trait Accept {
/// Transport-specific invocation context
type Context: Send + Sync + 'static;
Expand All @@ -13,11 +14,13 @@ pub trait Accept {
/// Incoming byte stream
type Incoming: AsyncRead + Send + Sync + Unpin + 'static;

/// Accept a connection returning a pair of streams and connection context
fn accept(
&self,
) -> impl Future<Output = std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>;
}

/// Wrapper returned by [`AcceptExt::map_context`]
pub struct AcceptMapContext<T, F> {
inner: T,
f: F,
Expand All @@ -37,7 +40,9 @@ impl<T, F> DerefMut for AcceptMapContext<T, F> {
}
}

/// Extension trait for [Accept]
pub trait AcceptExt: Accept + Sized {
/// Maps [`Self::Context`](Accept::Context) to a type `T` using `F`
fn map_context<T, F: Fn(Self::Context) -> T>(self, f: F) -> AcceptMapContext<Self, F> {
AcceptMapContext { inner: self, f }
}
Expand Down
1 change: 1 addition & 0 deletions crates/transport/src/frame/conn/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes};
use crate::frame::conn::{egress, ingress, Incoming, Outgoing};
use crate::frame::PROTOCOL;

/// Invoke function `func` on instance `instance`
#[instrument(level = "trace", skip_all)]
pub async fn invoke<P, I, O>(
mut tx: O,
Expand Down
7 changes: 6 additions & 1 deletion crates/transport/src/frame/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ pub use accept::*;
pub use client::*;
pub use server::*;

/// Index trie containing async stream subscriptions
#[derive(Default)]
pub enum IndexTrie {
enum IndexTrie {
#[default]
Empty,
Leaf {
Expand Down Expand Up @@ -123,6 +124,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTrie {
}

impl IndexTrie {
/// Takes the receiver
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn take_rx(&mut self, path: &[usize]) -> Option<mpsc::Receiver<std::io::Result<Bytes>>> {
let Some((i, path)) = path.split_first() else {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl IndexTrie {
}
}

/// Gets a sender
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn get_tx(&mut self, path: &[usize]) -> Option<mpsc::Sender<std::io::Result<Bytes>>> {
let Some((i, path)) = path.split_first() else {
Expand Down Expand Up @@ -269,6 +272,7 @@ impl IndexTrie {
}

pin_project! {
/// Incoming framed stream
#[project = IncomingProj]
pub struct Incoming {
#[pin]
Expand Down Expand Up @@ -332,6 +336,7 @@ impl AsyncRead for Incoming {
}

pin_project! {
/// Outgoing framed stream
#[project = OutgoingProj]
pub struct Outgoing {
#[pin]
Expand Down
13 changes: 12 additions & 1 deletion crates/transport/src/frame/conn/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::frame::conn::{egress, ingress, Accept};
use crate::frame::{Incoming, Outgoing};
use crate::Serve;

/// wRPC server for framed transports
pub struct Server<C, I, O>(Mutex<HashMap<String, HashMap<String, mpsc::Sender<(C, I, O)>>>>);

impl<C, I, O> Default for Server<C, I, O> {
Expand All @@ -27,10 +28,20 @@ impl<C, I, O> Default for Server<C, I, O> {
}
}

/// Error returned by [`Server::accept`]
pub enum AcceptError<C, I, O> {
/// I/O error
IO(std::io::Error),
/// Protocol version is not supported
UnsupportedVersion(u8),
UnhandledFunction { instance: String, name: String },
/// Function was not handled
UnhandledFunction {
/// Instance
instance: String,
/// Function name
name: String,
},
/// Message sending failed
Send(mpsc::error::SendError<(C, I, O)>),
}

Expand Down
6 changes: 6 additions & 0 deletions crates/transport/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport stream framing
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -19,14 +21,18 @@ pub const PROTOCOL: u8 = 0;
/// Owned wRPC frame
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Frame {
/// Frame path
pub path: Arc<[usize]>,
/// Frame data
pub data: Bytes,
}

/// wRPC frame reference
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct FrameRef<'a> {
/// Frame path
pub path: &'a [usize],
/// Frame data
pub data: &'a [u8],
}

Expand Down
7 changes: 7 additions & 0 deletions crates/transport/src/frame/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! TCP transport
use core::net::SocketAddr;

use anyhow::{bail, Context as _};
Expand All @@ -9,8 +11,13 @@ use tracing::instrument;
use crate::frame::{invoke, Accept, Incoming, Outgoing};
use crate::Invoke;

/// [Invoke] implementation in terms of a single [TcpStream]
///
/// [`Invoke::invoke`] can only be called once on [Invocation],
/// repeated calls with return an error
pub struct Invocation(std::sync::Mutex<Option<TcpStream>>);

/// [Invoke] implementation of a TCP transport
#[derive(Clone, Debug)]
pub struct Client<T>(T);

Expand Down
7 changes: 7 additions & 0 deletions crates/transport/src/frame/unix.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Unix domain socket transport
use std::path::{Path, PathBuf};

use anyhow::{bail, Context as _};
Expand All @@ -9,8 +11,13 @@ use tracing::instrument;
use crate::frame::{invoke, Accept, Incoming, Outgoing};
use crate::Invoke;

/// [Invoke] implementation in terms of a single [UnixStream]
///
/// [`Invoke::invoke`] can only be called once on [Invocation],
/// repeated calls with return an error
pub struct Invocation(std::sync::Mutex<Option<UnixStream>>);

/// [Invoke] implementation of a Unix domain socket transport
#[derive(Clone, Debug)]
pub struct Client<T>(T);

Expand Down
9 changes: 9 additions & 0 deletions crates/transport/src/invoke.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport client handle
use core::future::Future;
use core::mem;
use core::pin::pin;
Expand Down Expand Up @@ -76,9 +78,12 @@ pub trait Invoke: Send + Sync {
P: AsRef<[Option<usize>]> + Send + Sync;
}

/// Wrapper struct returned by [InvokeExt::timeout]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Timeout<'a, T: ?Sized> {
/// Inner [Invoke]
pub inner: &'a T,
/// Invocation timeout
pub timeout: Duration,
}

Expand Down Expand Up @@ -108,9 +113,12 @@ impl<T: Invoke> Invoke for Timeout<'_, T> {
}
}

/// Wrapper struct returned by [InvokeExt::timeout_owned]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutOwned<T> {
/// Inner [Invoke]
pub inner: T,
/// Invocation timeout
pub timeout: Duration,
}

Expand Down Expand Up @@ -138,6 +146,7 @@ impl<T: Invoke> Invoke for TimeoutOwned<T> {
}
}

/// Extension trait for [Invoke]
pub trait InvokeExt: Invoke {
/// Invoke function `func` on instance `instance` using typed `Params` and `Results`
#[instrument(level = "trace", skip(self, cx, params, paths))]
Expand Down
27 changes: 23 additions & 4 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
#![allow(clippy::type_complexity)]
#![deny(missing_docs)]

//! wRPC transport abstractions, codec and framing
//!
//! wRPC is an RPC framework based on [WIT](https://component-model.bytecodealliance.org/design/wit.html).
//! It follows client-server model, where peers (servers) may serve function and method calls invoked by the other peers (clients).
//!
//! The two main abstractions on top of which wRPC is built are:
//! - [Invoke] - the client-side handle to a wRPC transport, allowing clients to *invoke* WIT functions over wRPC transport
//! - [Serve] - the server-side handle to a wRPC transport, allowing servers to *serve* WIT functions over wRPC transport
//!
//! Implementations of [Invoke] and [Serve] define transport-specific, multiplexed bidirectional byte stream types:
//! - [`Invoke::Incoming`] and [`Serve::Incoming`] represent the stream *incoming* from a peer.
//! - [`Invoke::Outgoing`] and [`Serve::Outgoing`] represent the stream *outgoing* to a peer.
pub mod frame;
pub mod invoke;
Expand Down Expand Up @@ -27,20 +41,25 @@ use bytes::BytesMut;
use tokio::io::{AsyncRead, ReadBuf};
use tracing::trace;

/// Internal workaround trait
///
/// This is an internal trait used as a workaround for
/// https://github.com/rust-lang/rust/issues/63033
#[doc(hidden)]
// This is an internal trait used as a workaround for
// https://github.com/rust-lang/rust/issues/63033
pub trait Captures<'a> {}

impl<'a, T: ?Sized> Captures<'a> for T {}

/// `Index` implementations are capable of multiplexing underlying connections using a particular
/// structural `path`
/// Multiplexes streams
///
/// Implementations of this trait define multiplexing for underlying connections
/// using a particular structural `path`
pub trait Index<T> {
/// Index the entity using a structural `path`
fn index(&self, path: &[usize]) -> anyhow::Result<T>;
}

/// Buffered incoming stream used for decoding values
pub struct Incoming<T> {
buffer: BytesMut,
inner: T,
Expand Down
3 changes: 3 additions & 0 deletions crates/transport/src/serve.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport server handle
use core::future::Future;
use core::mem;
use core::pin::Pin;
Expand Down Expand Up @@ -38,6 +40,7 @@ pub trait Serve: Sync {
> + Send;
}

/// Extension trait for [Serve]
pub trait ServeExt: Serve {
/// Serve function `func` from instance `instance` using typed `Params` and `Results`
#[instrument(level = "trace", skip(self, paths))]
Expand Down
Loading

0 comments on commit 6e725fa

Please sign in to comment.