Skip to content

Commit

Permalink
Client subscriber (#130)
Browse files Browse the repository at this point in the history
Subscriber client
  • Loading branch information
asonnino authored Apr 15, 2022
1 parent cace50e commit 2ecbad1
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 21 deletions.
1 change: 1 addition & 0 deletions narwhal/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rand = { version = "0.7.3", optional = true }
crypto = { path = "../crypto" }
config = { path = "../config" }
primary = { path = "../primary" }
blake2 = "0.9"
tracing = "0.1.31"
tokio-util = { version = "0.7.0", features = ["codec"] }
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl<PublicKey: VerifyingKey> SubscriberConnection<PublicKey> {
.expect("Failed to send new subscriber to core");

// Interact with the subscriber.
// TODO [issue #120]: Better error handling (we have a log of prints and breaks here).
// TODO [issue #120]: Better error handling (we have a lot of prints and breaks here).
loop {
tokio::select! {
// Update the subscriber every time a certificate is sequenced.
Expand Down
3 changes: 1 addition & 2 deletions narwhal/worker/src/helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::worker::SerializedBatchMessage;
use bytes::Bytes;
use config::{Committee, WorkerId};
use crypto::traits::VerifyingKey;
Expand All @@ -10,8 +11,6 @@ use store::Store;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{error, warn};

use crate::processor::SerializedBatchMessage;

#[cfg(test)]
#[path = "tests/helper_tests.rs"]
pub mod helper_tests;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod worker;
#[path = "tests/common.rs"]
mod common;

pub use crate::worker::Worker;
pub use crate::worker::{SerializedBatchMessage, Worker, WorkerMessage};
6 changes: 1 addition & 5 deletions narwhal/worker/src/processor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::worker::SerializedWorkerPrimaryMessage;
use crate::worker::{SerializedBatchMessage, SerializedWorkerPrimaryMessage};
use blake2::digest::Update;
use config::WorkerId;

use primary::{BatchDigest, WorkerPrimaryMessage};
use store::Store;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -13,9 +12,6 @@ use tokio::sync::mpsc::{Receiver, Sender};
#[path = "tests/processor_tests.rs"]
pub mod processor_tests;

/// Indicates a serialized `WorkerMessage::Batch` message.
pub type SerializedBatchMessage = Vec<u8>;

/// Hashes and stores batches, it then outputs the batch's digest.
pub struct Processor;

Expand Down
2 changes: 1 addition & 1 deletion narwhal/worker/src/quorum_waiter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::processor::SerializedBatchMessage;
use crate::worker::SerializedBatchMessage;
use config::{Committee, Stake};
use crypto::traits::VerifyingKey;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _};
Expand Down
5 changes: 1 addition & 4 deletions narwhal/worker/src/synchronizer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
processor::SerializedBatchMessage,
worker::{Round, SerializedWorkerPrimaryMessage, WorkerMessage},
};
use crate::worker::{Round, SerializedBatchMessage, SerializedWorkerPrimaryMessage, WorkerMessage};
use bytes::Bytes;
use config::{Committee, WorkerId};
use crypto::traits::VerifyingKey;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/worker/src/tests/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{processor::SerializedBatchMessage, worker::WorkerMessage};
use crate::{worker::WorkerMessage, SerializedBatchMessage};
use blake2::digest::Update;
use bytes::Bytes;
use config::{Authority, Committee, PrimaryAddresses, WorkerAddresses};
Expand Down
12 changes: 6 additions & 6 deletions narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
batch_maker::BatchMaker,
helper::Helper,
primary_connector::PrimaryConnector,
processor::{Processor, SerializedBatchMessage},
quorum_waiter::QuorumWaiter,
synchronizer::Synchronizer,
batch_maker::BatchMaker, helper::Helper, primary_connector::PrimaryConnector,
processor::Processor, quorum_waiter::QuorumWaiter, synchronizer::Synchronizer,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -39,6 +35,9 @@ pub type Round = u64;
/// Indicates a serialized `WorkerPrimaryMessage` message.
pub type SerializedWorkerPrimaryMessage = Vec<u8>;

/// Indicates a serialized `WorkerMessage::Batch` message.
pub type SerializedBatchMessage = Vec<u8>;

/// The message exchanged between workers.
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "PublicKey: VerifyingKey"))]
Expand Down Expand Up @@ -290,6 +289,7 @@ struct WorkerReceiverHandler<PublicKey: VerifyingKey> {
impl<PublicKey: VerifyingKey> MessageHandler for WorkerReceiverHandler<PublicKey> {
async fn dispatch(&self, writer: &mut Writer, serialized: Bytes) -> Result<(), Box<dyn Error>> {
// Deserialize and parse the message.
// TODO [issue #7]: Do some accounting to prevent bad actors from use all our resources.
match bincode::deserialize(&serialized) {
Ok(WorkerMessage::Batch(..)) => {
self.tx_processor
Expand Down

0 comments on commit 2ecbad1

Please sign in to comment.