Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: add wrpc-transport docs #390

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

wRPC is a transport-agnostic protocol designed for asynchronous transmit of WIT function call invocations over network and other means of communication.

wRPC follows client-server model, where peers may *serve* function (servers) and method calls invoked by the other peers (clients).
wRPC follows client-server model, where peers (servers) may *serve* function and method calls invoked by the other peers (clients).

wRPC relies on [component model value definition encoding] for data encoding on the wire.

Expand Down
3 changes: 3 additions & 0 deletions crates/transport/src/frame/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use wasm_tokio::{Leb128DecoderU32, Leb128DecoderU64, Leb128Encoder};

use super::{Frame, FrameRef};

/// [Frame] decoder
pub struct Decoder {
path: Option<Vec<usize>>,
path_cap: usize,
Expand All @@ -15,6 +16,7 @@ pub struct Decoder {
}

impl Decoder {
/// Construct a new [Frame] decoder
#[must_use]
pub fn new(max_depth: u32, max_size: u64) -> Self {
Self {
Expand Down Expand Up @@ -125,6 +127,7 @@ impl tokio_util::codec::Decoder for Decoder {
}
}

/// [Frame] encoder
pub struct Encoder;

impl tokio_util::codec::Encoder<FrameRef<'_>> for Encoder {
Expand Down
5 changes: 5 additions & 0 deletions crates/transport/src/frame/conn/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::ops::{Deref, DerefMut};

use tokio::io::{AsyncRead, AsyncWrite};

/// Accepts connections on a transport
pub trait Accept {
/// Transport-specific invocation context
type Context: Send + Sync + 'static;
Expand All @@ -13,11 +14,13 @@ pub trait Accept {
/// Incoming byte stream
type Incoming: AsyncRead + Send + Sync + Unpin + 'static;

/// Accept a connection returning a pair of streams and connection context
fn accept(
&self,
) -> impl Future<Output = std::io::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>;
}

/// Wrapper returned by [`AcceptExt::map_context`]
pub struct AcceptMapContext<T, F> {
inner: T,
f: F,
Expand All @@ -37,7 +40,9 @@ impl<T, F> DerefMut for AcceptMapContext<T, F> {
}
}

/// Extension trait for [Accept]
pub trait AcceptExt: Accept + Sized {
/// Maps [`Self::Context`](Accept::Context) to a type `T` using `F`
fn map_context<T, F: Fn(Self::Context) -> T>(self, f: F) -> AcceptMapContext<Self, F> {
AcceptMapContext { inner: self, f }
}
Expand Down
1 change: 1 addition & 0 deletions crates/transport/src/frame/conn/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes};
use crate::frame::conn::{egress, ingress, Incoming, Outgoing};
use crate::frame::PROTOCOL;

/// Invoke function `func` on instance `instance`
#[instrument(level = "trace", skip_all)]
pub async fn invoke<P, I, O>(
mut tx: O,
Expand Down
7 changes: 6 additions & 1 deletion crates/transport/src/frame/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ pub use accept::*;
pub use client::*;
pub use server::*;

/// Index trie containing async stream subscriptions
#[derive(Default)]
pub enum IndexTrie {
enum IndexTrie {
#[default]
Empty,
Leaf {
Expand Down Expand Up @@ -123,6 +124,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTrie {
}

impl IndexTrie {
/// Takes the receiver
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn take_rx(&mut self, path: &[usize]) -> Option<mpsc::Receiver<std::io::Result<Bytes>>> {
let Some((i, path)) = path.split_first() else {
Expand Down Expand Up @@ -157,6 +159,7 @@ impl IndexTrie {
}
}

/// Gets a sender
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
fn get_tx(&mut self, path: &[usize]) -> Option<mpsc::Sender<std::io::Result<Bytes>>> {
let Some((i, path)) = path.split_first() else {
Expand Down Expand Up @@ -269,6 +272,7 @@ impl IndexTrie {
}

pin_project! {
/// Incoming framed stream
#[project = IncomingProj]
pub struct Incoming {
#[pin]
Expand Down Expand Up @@ -332,6 +336,7 @@ impl AsyncRead for Incoming {
}

pin_project! {
/// Outgoing framed stream
#[project = OutgoingProj]
pub struct Outgoing {
#[pin]
Expand Down
13 changes: 12 additions & 1 deletion crates/transport/src/frame/conn/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::frame::conn::{egress, ingress, Accept};
use crate::frame::{Incoming, Outgoing};
use crate::Serve;

/// wRPC server for framed transports
pub struct Server<C, I, O>(Mutex<HashMap<String, HashMap<String, mpsc::Sender<(C, I, O)>>>>);

impl<C, I, O> Default for Server<C, I, O> {
Expand All @@ -27,10 +28,20 @@ impl<C, I, O> Default for Server<C, I, O> {
}
}

/// Error returned by [`Server::accept`]
pub enum AcceptError<C, I, O> {
/// I/O error
IO(std::io::Error),
/// Protocol version is not supported
UnsupportedVersion(u8),
UnhandledFunction { instance: String, name: String },
/// Function was not handled
UnhandledFunction {
/// Instance
instance: String,
/// Function name
name: String,
},
/// Message sending failed
Send(mpsc::error::SendError<(C, I, O)>),
}

Expand Down
6 changes: 6 additions & 0 deletions crates/transport/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport stream framing

use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -19,14 +21,18 @@ pub const PROTOCOL: u8 = 0;
/// Owned wRPC frame
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Frame {
/// Frame path
pub path: Arc<[usize]>,
/// Frame data
pub data: Bytes,
}

/// wRPC frame reference
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct FrameRef<'a> {
/// Frame path
pub path: &'a [usize],
/// Frame data
pub data: &'a [u8],
}

Expand Down
7 changes: 7 additions & 0 deletions crates/transport/src/frame/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! TCP transport

use core::net::SocketAddr;

use anyhow::{bail, Context as _};
Expand All @@ -9,8 +11,13 @@ use tracing::instrument;
use crate::frame::{invoke, Accept, Incoming, Outgoing};
use crate::Invoke;

/// [Invoke] implementation in terms of a single [TcpStream]
///
/// [`Invoke::invoke`] can only be called once on [Invocation],
/// repeated calls with return an error
pub struct Invocation(std::sync::Mutex<Option<TcpStream>>);

/// [Invoke] implementation of a TCP transport
#[derive(Clone, Debug)]
pub struct Client<T>(T);

Expand Down
7 changes: 7 additions & 0 deletions crates/transport/src/frame/unix.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Unix domain socket transport

use std::path::{Path, PathBuf};

use anyhow::{bail, Context as _};
Expand All @@ -9,8 +11,13 @@ use tracing::instrument;
use crate::frame::{invoke, Accept, Incoming, Outgoing};
use crate::Invoke;

/// [Invoke] implementation in terms of a single [UnixStream]
///
/// [`Invoke::invoke`] can only be called once on [Invocation],
/// repeated calls with return an error
pub struct Invocation(std::sync::Mutex<Option<UnixStream>>);

/// [Invoke] implementation of a Unix domain socket transport
#[derive(Clone, Debug)]
pub struct Client<T>(T);

Expand Down
9 changes: 9 additions & 0 deletions crates/transport/src/invoke.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport client handle

use core::future::Future;
use core::mem;
use core::pin::pin;
Expand Down Expand Up @@ -76,9 +78,12 @@ pub trait Invoke: Send + Sync {
P: AsRef<[Option<usize>]> + Send + Sync;
}

/// Wrapper struct returned by [InvokeExt::timeout]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct Timeout<'a, T: ?Sized> {
/// Inner [Invoke]
pub inner: &'a T,
/// Invocation timeout
pub timeout: Duration,
}

Expand Down Expand Up @@ -108,9 +113,12 @@ impl<T: Invoke> Invoke for Timeout<'_, T> {
}
}

/// Wrapper struct returned by [InvokeExt::timeout_owned]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutOwned<T> {
/// Inner [Invoke]
pub inner: T,
/// Invocation timeout
pub timeout: Duration,
}

Expand Down Expand Up @@ -138,6 +146,7 @@ impl<T: Invoke> Invoke for TimeoutOwned<T> {
}
}

/// Extension trait for [Invoke]
pub trait InvokeExt: Invoke {
/// Invoke function `func` on instance `instance` using typed `Params` and `Results`
#[instrument(level = "trace", skip(self, cx, params, paths))]
Expand Down
27 changes: 23 additions & 4 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
#![allow(clippy::type_complexity)]
#![deny(missing_docs)]

//! wRPC transport abstractions, codec and framing
//!
//! wRPC is an RPC framework based on [WIT](https://component-model.bytecodealliance.org/design/wit.html).
//! It follows client-server model, where peers (servers) may serve function and method calls invoked by the other peers (clients).
//!
//! The two main abstractions on top of which wRPC is built are:
//! - [Invoke] - the client-side handle to a wRPC transport, allowing clients to *invoke* WIT functions over wRPC transport
//! - [Serve] - the server-side handle to a wRPC transport, allowing servers to *serve* WIT functions over wRPC transport
//!
//! Implementations of [Invoke] and [Serve] define transport-specific, multiplexed bidirectional byte stream types:
//! - [`Invoke::Incoming`] and [`Serve::Incoming`] represent the stream *incoming* from a peer.
//! - [`Invoke::Outgoing`] and [`Serve::Outgoing`] represent the stream *outgoing* to a peer.

pub mod frame;
pub mod invoke;
Expand Down Expand Up @@ -27,20 +41,25 @@ use bytes::BytesMut;
use tokio::io::{AsyncRead, ReadBuf};
use tracing::trace;

/// Internal workaround trait
///
/// This is an internal trait used as a workaround for
/// https://github.com/rust-lang/rust/issues/63033
#[doc(hidden)]
// This is an internal trait used as a workaround for
// https://github.com/rust-lang/rust/issues/63033
pub trait Captures<'a> {}

impl<'a, T: ?Sized> Captures<'a> for T {}

/// `Index` implementations are capable of multiplexing underlying connections using a particular
/// structural `path`
/// Multiplexes streams
///
/// Implementations of this trait define multiplexing for underlying connections
/// using a particular structural `path`
pub trait Index<T> {
/// Index the entity using a structural `path`
fn index(&self, path: &[usize]) -> anyhow::Result<T>;
}

/// Buffered incoming stream used for decoding values
pub struct Incoming<T> {
buffer: BytesMut,
inner: T,
Expand Down
3 changes: 3 additions & 0 deletions crates/transport/src/serve.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! wRPC transport server handle

use core::future::Future;
use core::mem;
use core::pin::Pin;
Expand Down Expand Up @@ -38,6 +40,7 @@ pub trait Serve: Sync {
> + Send;
}

/// Extension trait for [Serve]
pub trait ServeExt: Serve {
/// Serve function `func` from instance `instance` using typed `Params` and `Results`
#[instrument(level = "trace", skip(self, paths))]
Expand Down
Loading
Loading