Skip to content

Commit

Permalink
finish broker <-> user auth
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 5, 2024
1 parent 95f5079 commit 080a1c8
Show file tree
Hide file tree
Showing 18 changed files with 699 additions and 408 deletions.
197 changes: 160 additions & 37 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
//! This file contains the implementation of the `Broker`, which routes messages
//! for the Push CDN.
use std::{marker::PhantomData, sync::Arc};
use std::{
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};

use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
use proto::{
bail,
connection::protocols::{Connection, Listener, Protocol},
connection::{
auth::{broker::BrokerToUser, AuthenticationFlow},
protocols::{Listener, Protocol},
},
crypto::Serializable,
error::{Error, Result},
parse_socket_address,
redis::{self, BrokerIdentifier},
};
use tokio::spawn;
use tracing::warn;
use tokio::{select, spawn, time::sleep};
use tracing::{error, warn};

/// The broker's configuration. We need this when we create a new one.
/// TODO: clean up these generics. could be a generic type that implements both
Expand Down Expand Up @@ -44,45 +55,71 @@ pub struct Config<BrokerSignatureScheme: JfSignatureScheme<PublicParameter = (),
pub maybe_tls_key_path: Option<String>,
}

pub struct Inner<PrivateSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>>
{
/// The broker `Inner` that we use to share common data between broker tasks.
struct Inner<
// TODO: clean these up with some sort of generic trick or something
BrokerSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
BrokerProtocolType: Protocol,
> {
/// The number of connected users (that we post to Redis so that marshals can equally
/// distribute users)
num_connected_users: AtomicU64,

/// A broker identifier that we can use to establish uniqueness among brokers.
identifier: BrokerIdentifier,

// The `Redis` client that we will use to maintain consistency between brokers and marshals
redis_client: redis::Client,

/// The underlying (public) verification key, used to authenticate with other brokers
verification_key: PrivateSignatureScheme::VerificationKey,
verification_key: BrokerSignatureScheme::VerificationKey,

/// The underlying (private) signing key, used to authenticate with other brokers
signing_key: PrivateSignatureScheme::SigningKey,
signing_key: BrokerSignatureScheme::SigningKey,

/// The `PhantomData` that we need to be generic over protocol types.
pd: PhantomData<(UserProtocolType, BrokerProtocolType, UserSignatureScheme)>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
pub struct Broker<
PrivateSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
BrokerSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
BrokerProtocolType: Protocol,
> {
/// The broker's `Inner`. We clone this and pass it around when needed.
inner: Arc<Inner<PrivateSignatureScheme>>,

/// The `PhantomData` we need to be able to be generic over a signature scheme.
pd: PhantomData<PrivateSignatureScheme>,
inner: Arc<
Inner<BrokerSignatureScheme, UserSignatureScheme, UserProtocolType, BrokerProtocolType>,
>,

/// The public (user -> broker) listener
user_listener: UserProtocolType::Listener,

/// The private (broker <-> broker) listener
broker_listener: BrokerProtocolType::Listener,

// The `Redis` client that we will use to maintain consistency between brokers and marshals
redis_client: redis::Client,
}

impl<
PrivateSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
BrokerSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
BrokerProtocolType: Protocol,
> Broker<PrivateSignatureScheme, UserProtocolType, BrokerProtocolType>
> Broker<BrokerSignatureScheme, UserSignatureScheme, UserProtocolType, BrokerProtocolType>
where
UserSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::SigningKey: Serializable,
{
/// Create a new `Broker` from a `Config`
pub async fn new(config: Config<PrivateSignatureScheme>) -> Result<Self> {
///
/// # Errors
/// - If we fail to create the `Redis` client
/// - If we fail to bind to our public endpoint
/// - If we fail to bind to our private endpoint
pub async fn new(config: Config<BrokerSignatureScheme>) -> Result<Self> {
// Extrapolate values from the underlying broker configuration
let Config {
user_advertise_address,
Expand All @@ -99,16 +136,15 @@ impl<
maybe_tls_key_path,
} = config;

// Create a unique broker identifier
let identifier = BrokerIdentifier {
user_advertise_address,
broker_advertise_address,
};

// Create the `Redis` client we will use to maintain consistency
let redis_client = bail!(
redis::Client::new(
redis_endpoint,
Some(BrokerIdentifier {
user_advertise_address,
broker_advertise_address,
}),
)
.await,
redis::Client::new(redis_endpoint, Some(identifier.clone()),).await,
Parse,
"failed to create Redis client"
);
Expand Down Expand Up @@ -145,31 +181,100 @@ impl<
)
);

// Create and return `Self` as wrapping an `Inner` (with things that we need to share)
Ok(Self {
inner: Arc::from(Inner {
verification_key,
signing_key,
num_connected_users: AtomicU64::default(),
redis_client,
identifier,
pd: PhantomData,
}),
user_listener,
broker_listener,
redis_client,
pd: PhantomData,
})
}

async fn handle_broker_connection(connection: BrokerProtocolType::Connection) {}
/// This function handles a broker (private) connection. We take the following steps:
/// 1. Authenticate the broker
/// 2. TODO
async fn handle_broker_connection(
inner: Arc<
Inner<BrokerSignatureScheme, UserSignatureScheme, UserProtocolType, BrokerProtocolType>,
>,
connection: BrokerProtocolType::Connection,
) {
}

/// This function handles a user (public) connection. We take the following steps:
/// 1. Authenticate the user
/// 2. TODO
async fn handle_user_connection(
inner: Arc<
Inner<BrokerSignatureScheme, UserSignatureScheme, UserProtocolType, BrokerProtocolType>,
>,
connection: UserProtocolType::Connection,
) {
// Create verification data from the `Redis` client and our identifier
let mut verification = BrokerToUser {
redis_client: inner.redis_client.clone(),
identifier: inner.identifier.clone(),
};

async fn handle_user_connection(connection: UserProtocolType::Connection) {}
// Verify (authenticate) the connection
if <BrokerToUser as AuthenticationFlow<
UserSignatureScheme,
UserProtocolType,
>>::authenticate(&mut verification, &connection)
.await.is_err()
{
return;
};

println!("meow");
}

/// The main loop for a broker.
/// Consumes self.
///
/// # Errors
/// Right now, we return a `Result` but don't actually ever error.
/// If any of the following tasks exit:
/// - The heartbeat (Redis) task
/// - The user connection handler
/// - The broker connection handler
pub async fn start(self) -> Result<()> {
// Clone `inner` so we can use shared data
let inner = self.inner.clone();

// Spawn the heartbeat task, which we use to register with `Redis` every so often.
let heartbeat_task = spawn(async move {
// Clone the `Redis` client, which needs to be mutable
let mut redis_client = inner.redis_client.clone();
loop {
// Register with `Redis` every 20 seconds, updating our number of connected users
if let Err(err) = redis_client
.perform_heartbeat(
inner.num_connected_users.load(Ordering::Relaxed),
Duration::from_secs(60),
)
.await
{
// If we fail, we want to see this
error!("failed to perform heartbeat: {}", err);
}

// Sleep for 20 seconds
sleep(Duration::from_secs(20)).await;
}
});

// Clone `inner` so we can use shared data
let inner = self.inner.clone();

// Spawn the public (user) listener task
// TODO: maybe macro this, since it's repeat code with the private listener task
spawn(async move {
let user_listener_task = spawn(async move {
loop {
// Accept a connection. If we fail, print the error and keep going.
//
Expand All @@ -183,12 +288,17 @@ impl<
}
};

spawn(Self::handle_user_connection(connection));
// Spawn a task to handle the [user/public] connection
let inner = inner.clone();
spawn(Self::handle_user_connection(inner, connection));
}
});

// Clone `inner` so we can use shared data
let inner = self.inner.clone();

// Spawn the private (broker) listener task
spawn(async move {
let broker_listener_task = spawn(async move {
loop {
// Accept a connection. If we fail, print the error and keep going.
//
Expand All @@ -202,10 +312,23 @@ impl<
}
};

spawn(Self::handle_broker_connection(connection));
// Spawn a task to handle the [broker/private] connection
let inner = inner.clone();
spawn(Self::handle_broker_connection(inner, connection));
}
});

Ok(())
// If one of the tasks exists, we want to return (stopping the program)
select! {
_ = heartbeat_task => {
Err(Error::Exited("heartbeat task exited!".to_string()))
}
_ = user_listener_task => {
Err(Error::Exited("user listener task exited!".to_string()))
}
_ = broker_listener_task => {
Err(Error::Exited("broker listener task exited!".to_string()))
}
}
}
}
4 changes: 2 additions & 2 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() -> Result<()> {
broker_advertise_address: "127.0.0.1:8081".to_string(),
broker_bind_address: "127.0.0.1:8081".to_string(),

redis_endpoint: "127.0.0.1:6789".to_string(),
redis_endpoint: "redis://:changeme!@127.0.0.1:6379".to_string(),

signing_key,
verification_key,
Expand All @@ -41,7 +41,7 @@ async fn main() -> Result<()> {
};

// Create new `Broker`
let marshal = Broker::<BLS, Quic, Tcp>::new(broker_config).await?;
let marshal = Broker::<BLS, BLS, Quic, Tcp>::new(broker_config).await?;

// Start the main loop, consuming it
marshal.start().await?;
Expand Down
25 changes: 13 additions & 12 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
//! In here we define an API that is a little more higher-level and ergonomic
//! for end users. It is a light wrapper on top of a `Sticky` connection.
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
use proto::{
bail,
connection::{
auth::UserToMarshal,
auth::user::UserToMarshalToBroker,
protocols::Protocol,
sticky::{self, Sticky},
},
crypto,
crypto::{self, Serializable},
error::Error,
error::Result,
message::{Broadcast, Direct, Message, Subscribe, Topic, Unsubscribe},
Expand All @@ -22,11 +21,11 @@ use proto::{
pub struct Client<
SignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
ProtocolType: Protocol,
>(Sticky<SignatureScheme, ProtocolType, UserToMarshal>)
>(Sticky<SignatureScheme, ProtocolType, UserToMarshalToBroker<SignatureScheme>>)
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize;
SignatureScheme::Signature: Serializable,
SignatureScheme::VerificationKey: Serializable,
SignatureScheme::SigningKey: Serializable;

pub type Config<SignatureScheme, ProtocolType, AuthFlow> =
sticky::Config<SignatureScheme, ProtocolType, AuthFlow>;
Expand All @@ -36,17 +35,19 @@ impl<
ProtocolType: Protocol,
> Client<SignatureScheme, ProtocolType>
where
SignatureScheme::Signature: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::VerificationKey: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::SigningKey: CanonicalSerialize + CanonicalDeserialize,
SignatureScheme::Signature: Serializable,
SignatureScheme::VerificationKey: Serializable,
SignatureScheme::SigningKey: Serializable,
{
/// Creates a new client from the given `Config`. Immediately will attempt
/// a conection if none is supplied.
///
/// # Errors
/// Errors if the downstream `Sticky` object was unable to be made.
/// This usually happens when we can't bind to the specified endpoint.
pub async fn new(config: Config<SignatureScheme, ProtocolType, UserToMarshal>) -> Result<Self> {
pub async fn new(
config: Config<SignatureScheme, ProtocolType, UserToMarshalToBroker<SignatureScheme>>,
) -> Result<Self> {
Self::new_with_connection(config, Option::None).await
}

Expand All @@ -58,7 +59,7 @@ where
/// Errors if the downstream `Sticky` object was unable to be created.
/// This usually happens when we can't bind to the specified endpoint.
pub async fn new_with_connection(
config: Config<SignatureScheme, ProtocolType, UserToMarshal>,
config: Config<SignatureScheme, ProtocolType, UserToMarshalToBroker<SignatureScheme>>,
connection: Option<ProtocolType::Connection>,
) -> Result<Self> {
Ok(Self(bail!(
Expand Down
Loading

0 comments on commit 080a1c8

Please sign in to comment.