Skip to content

Commit

Permalink
listeners for both tcp and quic
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 2, 2024
1 parent d79f7d6 commit 98d5ece
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 110 deletions.
28 changes: 14 additions & 14 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use proto::{
bail,
connection::{
flow::Flow,
protocols::Protocol,
sticky::{self, Sticky},
Connection,
},
crypto,
error::Error,
Expand All @@ -23,18 +23,18 @@ use proto::{
/// more ergonomic.
pub struct Client<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
ConnectionFlow: Flow<SignatureScheme, ConnectionType>,
>(Sticky<SignatureScheme, ConnectionType, ConnectionFlow>);
ProtocolType: Protocol,
ConnectionFlow: Flow<SignatureScheme, ProtocolType>,
>(Sticky<SignatureScheme, ProtocolType, ConnectionFlow>);

pub type Config<SignatureScheme, ConnectionType, ConnectionFlow> =
sticky::Config<SignatureScheme, ConnectionType, ConnectionFlow>;
pub type Config<SignatureScheme, ProtocolType, ConnectionFlow> =
sticky::Config<SignatureScheme, ProtocolType, ConnectionFlow>;

impl<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
ConnectionFlow: Flow<SignatureScheme, ConnectionType>,
> Client<SignatureScheme, ConnectionType, ConnectionFlow>
ProtocolType: Protocol,
ConnectionFlow: Flow<SignatureScheme, ProtocolType>,
> Client<SignatureScheme, ProtocolType, ConnectionFlow>
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
Expand All @@ -47,7 +47,7 @@ where
/// Errors if the downstream `Sticky` object was unable to be made.
/// This usually happens when we can't bind to the specified endpoint.
pub async fn new(
config: Config<SignatureScheme, ConnectionType, ConnectionFlow>,
config: Config<SignatureScheme, ProtocolType, ConnectionFlow>,
) -> Result<Self> {
Self::new_with_connection(config, Option::None).await
}
Expand All @@ -60,8 +60,8 @@ where
/// Errors if the downstream `Sticky` object was unable to be created.
/// This usually happens when we can't bind to the specified endpoint.
pub async fn new_with_connection(
config: Config<SignatureScheme, ConnectionType, ConnectionFlow>,
connection: Option<ConnectionType>,
config: Config<SignatureScheme, ProtocolType, ConnectionFlow>,
connection: Option<ProtocolType::Connection>,
) -> Result<Self> {
Ok(Self(bail!(
Sticky::from_config_and_connection(config, connection).await,
Expand Down Expand Up @@ -128,7 +128,7 @@ where
///
/// # Errors
/// If the connection or serialization has failed
///
///
/// TODO IMPORTANT: see if we want this, or if we'd prefer `set_subscriptions()`
pub async fn subscribe(&self, topics: Vec<Topic>) -> Result<()> {
// Lock subscribed topics here so if we're reconnecting we maintain parity
Expand Down Expand Up @@ -169,7 +169,7 @@ where

/// Sends a pre-formed message over the wire. Various functions make use
/// of this one downstream.
///
///
/// # Errors
/// - if the downstream message sending fails.
pub async fn send_message_raw(&self, message: Arc<Message>) -> Result<()> {
Expand Down
10 changes: 8 additions & 2 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;

use client::{Client, Config};
use proto::{
connection::{fallible::quic::Quic, flow::ToMarshal},
connection::{flow::ToMarshal, protocols::quic::Quic},
crypto,
error::Result,
message::Topic,
Expand All @@ -14,7 +14,7 @@ use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme
async fn main() -> Result<()> {
let (signing_key, verification_key) = crypto::generate_random_keypair::<BLS>()?;

let _client = Client::<BLS, Quic, ToMarshal>::new(Config {
let client = Client::<BLS, Quic, ToMarshal>::new(Config {
verification_key,
signing_key,
remote_address: "google.com:80".to_string(),
Expand All @@ -23,5 +23,11 @@ async fn main() -> Result<()> {
})
.await?;

client
.send_direct_message(verification_key, vec![123])
.await?;

println!("{:?}", client.receive_message().await);

Ok(())
}
7 changes: 0 additions & 7 deletions proto/src/connection/fallible/mod.rs

This file was deleted.

20 changes: 10 additions & 10 deletions proto/src/connection/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;

use crate::{
bail,
connection::Connection,
crypto::{self, DeterministicRng},
error::{Error, Result},
message::{AuthenticateWithKey, AuthenticateWithPermit, Message, Subscribe, Topic},
};

use super::protocols::Protocol;
use crate::connection::protocols::Connection;

/// TODO: BIDIRECTIONAL AUTHENTICATION FOR USERS<->BROKERS
///
/// The `Flow` trait implements a connection flow that takes in an endpoint,
/// signing key, and verification key and returns a connection.
#[async_trait]
pub trait Flow<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
ProtocolType: Protocol,
>: Send + Sync
{
/// This is the meat of `Flow`. We define this for every type of connection flow we have.
Expand All @@ -36,7 +38,7 @@ pub trait Flow<
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
subscribed_topics: Vec<Topic>,
) -> Result<ConnectionType>;
) -> Result<ProtocolType::Connection>;
}

/// This struct implements `Flow`. It defines an implementation wherein we connect
Expand All @@ -47,8 +49,8 @@ pub struct ToMarshal {}
#[async_trait]
impl<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ConnectionType: Connection,
> Flow<SignatureScheme, ConnectionType> for ToMarshal
ProtocolType: Protocol,
> Flow<SignatureScheme, ProtocolType> for ToMarshal
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
Expand All @@ -58,17 +60,15 @@ where
/// 1. Authenticate with the marshal with a signed message, who optionally
/// returns a permit and a server address
/// 2. Use the permit and server address to connect to the broker
/// 3. ???
/// 4. Profit
async fn connect(
endpoint: String,
signing_key: &SignatureScheme::SigningKey,
verification_key: &SignatureScheme::VerificationKey,
subscribed_topics: Vec<Topic>,
) -> Result<ConnectionType> {
) -> Result<ProtocolType::Connection> {
// Create the initial connection, which is unauthenticated at this point
let connection = bail!(
ConnectionType::connect(endpoint).await,
ProtocolType::Connection::connect(endpoint).await,
Connection,
"failed to connect to marshal"
);
Expand Down Expand Up @@ -153,7 +153,7 @@ where

// Create a connection to the broker. Drops the connection to the marshal.
let connection = bail!(
ConnectionType::connect(broker_address).await,
ProtocolType::Connection::connect(broker_address).await,
Connection,
"failed to connect to broker"
);
Expand Down
31 changes: 3 additions & 28 deletions proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,7 @@
use std::sync::Arc;
//! In this file we define network abstractions, which can be implemented
//! for any network protocol.
use async_trait::async_trait;

use crate::{error::Result, message::Message};

pub mod fallible;
pub mod flow;
pub mod protocols;
pub mod sticky;

#[async_trait]
pub trait Connection: Send + Sync {
/// Receive a single message from the connection.
///
/// # Errors
/// Errors if we either fail to receive the message. This usually means a connection problem.
async fn recv_message(&self) -> Result<Message>;

/// Send a single message over the connection.
///
/// # Errors
/// Errors if we fail to deliver the message. This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()>;

/// Connect to a remote address, returning an instance of `Self`.
///
/// # Errors
/// Errors if we fail to connect or if we fail to bind to the interface we want.
async fn connect(remote_endpoint: String) -> Result<Self>
where
Self: Sized;
}
64 changes: 64 additions & 0 deletions proto/src/connection/protocols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! This module defines connections, listeners, and their implementations.
use std::{net::SocketAddr, sync::Arc};

use async_trait::async_trait;

use crate::{error::Result, message::Message};
pub mod quic;
pub mod tcp;

/// Assert that we are at _least_ running on a 64-bit system
/// TODO: find out if there is a better way than the `u64` cast
const _: [(); 0 - (!(usize::BITS >= u64::BITS)) as usize] = [];


pub trait Protocol: Send + Sync {
type Connection: Connection;
type Listener: Listener<Self::Connection>;
}

#[async_trait]
pub trait Connection: Send + Sync {
/// Receive a single message from the connection.
///
/// # Errors
/// Errors if we either fail to receive the message. This usually means a connection problem.
async fn recv_message(&self) -> Result<Message>;

/// Send a single message over the connection.
///
/// # Errors
/// Errors if we fail to deliver the message. This usually means a connection problem.
async fn send_message(&self, message: Arc<Message>) -> Result<()>;

/// Connect to a remote address, returning an instance of `Self`.
///
/// # Errors
/// Errors if we fail to connect or if we fail to bind to the interface we want.
async fn connect(remote_endpoint: String) -> Result<Self>
where
Self: Sized;
}

#[async_trait]
pub trait Listener<ConnectionType: Connection>: Send + Sync {
/// Bind to the local address, returning an instance of `Self`.
///
/// # Errors
/// If we fail to bind to the given socket address
async fn bind(
bind_address: SocketAddr,
maybe_tls_cert_path: Option<String>,
maybe_tls_key_path: Option<String>,
) -> Result<Self>
where
Self: Sized;

/// Accept a connection from the local, bound socket.
/// Returns a connection or an error if we encountered one.
///
/// # Errors
/// If we fail to accept a connection
async fn accept(&self) -> Result<ConnectionType>;
}
Loading

0 comments on commit 98d5ece

Please sign in to comment.