Skip to content

Commit

Permalink
prepare to rewrite quic/tcp to be owned senders and receivers
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 6, 2024
1 parent 4a1e688 commit 71d20a4
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ ark-serialize.workspace = true
tracing-subscriber.workspace = true
parking_lot = "0.12.1"
clap.workspace = true
local-ip-address = "0.5.7"
local-ip-address = "0.5.7"
either = "1.9.0"
53 changes: 24 additions & 29 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
//! This file contains the implementation of the `Broker`, which routes messages
//! for the Push CDN.
// TODO: inter-broker message batching

use std::{
collections::HashSet,
hash::Hash,
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
// TODO: convert QUIC to locked single sender/reciver

mod state;

use std::{marker::PhantomData, sync::Arc, time::Duration};

use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
// TODO: figure out if we should use Tokio's here
use parking_lot::RwLock;
use proto::{
authenticate_with_broker, bail,
connection::{
Expand Down Expand Up @@ -70,26 +62,20 @@ struct Inner<
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
> where
UserSignatureScheme::VerificationKey: Hash,
UserSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::SigningKey: Serializable,
{
/// The number of connected users (that we post to Redis so that marshals can equally
/// distribute users)
num_connected_users: AtomicU64,

/// A broker identifier that we can use to establish uniqueness among brokers.
identifier: BrokerIdentifier,

/// The (clonable) `Redis` client that we will use to maintain consistency between brokers and marshals
redis_client: redis::Client,

/// The list of all brokers we are connected to
brokers_connected: RwLock<HashSet<BrokerIdentifier>>,

/// The underlying (public) verification key, used to authenticate with the server. Checked
/// against the stake table.
/// TODO: verif & signing key in one struct
pub verification_key: BrokerSignatureScheme::VerificationKey,

/// The underlying (private) signing key, used to sign messages to send to the server during the
Expand All @@ -108,7 +94,7 @@ pub struct Broker<
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
> where
UserSignatureScheme::VerificationKey: Hash,
UserSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::SigningKey: Serializable,
Expand All @@ -133,7 +119,7 @@ impl<
> Broker<BrokerSignatureScheme, BrokerProtocolType, UserSignatureScheme, UserProtocolType>
where
UserSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable + Hash,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::SigningKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
BrokerSignatureScheme::VerificationKey: Serializable,
Expand Down Expand Up @@ -210,12 +196,10 @@ where
// Create and return `Self` as wrapping an `Inner` (with things that we need to share)
Ok(Self {
inner: Arc::from(Inner {
num_connected_users: AtomicU64::default(),
redis_client,
identifier,
verification_key,
signing_key,
brokers_connected: RwLock::from(HashSet::new()),
pd: PhantomData,
}),
user_listener,
Expand Down Expand Up @@ -269,8 +253,17 @@ where
};

println!("meow");
// Add to our direct map
// inner.all_connected_users.write().insert(verification_key);

// // Create a new queued connection
// let connection_with_queue = ConnectionWithQueue{
// connection: connection,
// last_sent: SystemTime::now(),
// buffer: Arc::default(),

// }

// // Add to our direct map
// inner.user_to_connection.write().await.insert(verification_key, Either::Left());
}

/// The main loop for a broker.
Expand All @@ -294,7 +287,8 @@ where
// Register with `Redis` every 20 seconds, updating our number of connected users
if let Err(err) = redis_client
.perform_heartbeat(
inner.num_connected_users.load(Ordering::Relaxed),
// todo: actually pull in this number
0,
Duration::from_secs(60),
)
.await
Expand All @@ -307,7 +301,8 @@ where
match redis_client.get_other_brokers().await {
Ok(brokers) => {
// Calculate the difference, spawn tasks to connect to them
for broker in brokers.difference(&inner.brokers_connected.read()) {
// TODO for broker in brokers.difference(&inner.brokers_connected.read()) {
for broker in brokers {
// TODO: make this into a separate function
// Extrapolate the address to connect to
let to_connect_address = broker.broker_advertise_address.clone();
Expand Down
14 changes: 14 additions & 0 deletions broker/src/state/broker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// use std::{
// sync::{atomic::AtomicU64, Arc},
// time::SystemTime,
// };

// use proto::connection::protocols::Protocol;
// use tokio::sync::Mutex;

// pub struct ConnectionWithQueue<ProtocolType: Protocol> {
// pub connection: Arc<Mutex<ProtocolType::Connection>>,
// pub last_sent: SystemTime,
// pub buffer: Arc<Mutex<Vec<Arc<Vec<u8>>>>>,
// pub size: AtomicU64,
// }
2 changes: 2 additions & 0 deletions broker/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod broker;
pub mod user;
Empty file added broker/src/state/user.rs
Empty file.
6 changes: 3 additions & 3 deletions proto/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use jf_primitives::signatures::SignatureScheme;
use rand::{CryptoRng, RngCore};
use rcgen::generate_simple_self_signed;
use rustls::ClientConfig;
use std::sync::Arc;
use std::{hash::Hash, sync::Arc};

/// Helps clean up some trait boundaries
pub trait Serializable: CanonicalSerialize + CanonicalDeserialize {}
impl<T: CanonicalSerialize + CanonicalDeserialize> Serializable for T {}
pub trait Serializable: CanonicalSerialize + CanonicalDeserialize + Eq + PartialEq + Hash{}
impl<T: CanonicalSerialize + CanonicalDeserialize + Eq + PartialEq + Hash> Serializable for T {}

/// The oxymoron function. Used mostly with crypto key generation to generate
/// "random" values that are actually deterministic based on the input.
Expand Down

0 comments on commit 71d20a4

Please sign in to comment.