diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 55544c9560d7e8..18a37481dc4e8b 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -2,7 +2,7 @@ use bank::Bank; use entry::Entry; -use ledger; +use ledger::Block; use packet; use result::Result; use serde_json; @@ -63,7 +63,7 @@ impl<'a> EntryWriter<'a> { let mut q = VecDeque::new(); let list = self.write_entries(writer, entry_receiver)?; trace!("New blobs? {}", list.len()); - ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); + list.to_blobs(blob_recycler, &mut q); if !q.is_empty() { trace!("broadcasting {}", q.len()); broadcast.send(q)?; diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs new file mode 100644 index 00000000000000..71e633d67b78c0 --- /dev/null +++ b/src/fetch_stage.rs @@ -0,0 +1,31 @@ +//! The `fetch_stage` batches input from a UDP socket and sends it to a channel. + +use packet; +use std::net::UdpSocket; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::thread::JoinHandle; +use streamer; + +pub struct FetchStage { + pub packet_receiver: streamer::PacketReceiver, + pub thread_hdl: JoinHandle<()>, +} + +impl FetchStage { + pub fn new( + socket: UdpSocket, + exit: Arc, + packet_recycler: packet::PacketRecycler, + ) -> Self { + let (packet_sender, packet_receiver) = channel(); + let thread_hdl = + streamer::receiver(socket, exit.clone(), packet_recycler.clone(), packet_sender); + + FetchStage { + packet_receiver, + thread_hdl, + } + } +} diff --git a/src/ledger.rs b/src/ledger.rs index e0d86438bbee6f..85bc6cb59d5a94 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -16,6 +16,7 @@ use transaction::Transaction; pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; + fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque); } impl Block for [Entry] { @@ -24,6 +25,66 @@ impl Block for [Entry] { let entry_pairs = genesis.par_iter().chain(self).zip(self); entry_pairs.all(|(x0, x1)| x1.verify(&x0.id)) } + + fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque) { + let mut start = 0; + let mut end = 0; + while start < self.len() { + let mut entries: Vec> = Vec::new(); + let mut total = 0; + for i in &self[start..] { + total += size_of::() * i.transactions.len(); + total += size_of::(); + if total >= BLOB_DATA_SIZE { + break; + } + end += 1; + } + // See if we need to split the transactions + if end <= start { + let mut transaction_start = 0; + let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::(); + let total_entry_chunks = (self[end].transactions.len() + num_transactions_per_blob + - 1) / num_transactions_per_blob; + trace!( + "splitting transactions end: {} total_chunks: {}", + end, + total_entry_chunks + ); + for _ in 0..total_entry_chunks { + let transaction_end = min( + transaction_start + num_transactions_per_blob, + self[end].transactions.len(), + ); + let mut entry = Entry { + num_hashes: self[end].num_hashes, + id: self[end].id, + transactions: self[end].transactions[transaction_start..transaction_end] + .to_vec(), + }; + entries.push(vec![entry]); + transaction_start = transaction_end; + } + end += 1; + } else { + entries.push(self[start..end].to_vec()); + } + + for entry in entries { + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &entry).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + } + start = end; + } + } } /// Create a vector of Entries of length `transaction_batches.len()` from `start_hash` hash, `num_hashes`, and `transaction_batches`. @@ -43,70 +104,6 @@ pub fn next_entries( entries } -pub fn process_entry_list_into_blobs( - list: &Vec, - blob_recycler: &packet::BlobRecycler, - q: &mut VecDeque, -) { - let mut start = 0; - let mut end = 0; - while start < list.len() { - let mut entries: Vec> = Vec::new(); - let mut total = 0; - for i in &list[start..] { - total += size_of::() * i.transactions.len(); - total += size_of::(); - if total >= BLOB_DATA_SIZE { - break; - } - end += 1; - } - // See if we need to split the transactions - if end <= start { - let mut transaction_start = 0; - let num_transactions_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = (list[end].transactions.len() + num_transactions_per_blob - 1) - / num_transactions_per_blob; - trace!( - "splitting transactions end: {} total_chunks: {}", - end, - total_entry_chunks - ); - for _ in 0..total_entry_chunks { - let transaction_end = min( - transaction_start + num_transactions_per_blob, - list[end].transactions.len(), - ); - let mut entry = Entry { - num_hashes: list[end].num_hashes, - id: list[end].id, - transactions: list[end].transactions[transaction_start..transaction_end] - .to_vec(), - }; - entries.push(vec![entry]); - transaction_start = transaction_end; - } - end += 1; - } else { - entries.push(list[start..end].to_vec()); - } - - for entry in entries { - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &entry).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - } - start = end; - } -} - pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> Vec { let mut entries_to_apply: Vec = Vec::new(); let mut last_id = Hash::default(); @@ -159,13 +156,12 @@ mod tests { let transactions = vec![tx0.clone(); 10000]; let e0 = Entry::new(&zero, 0, transactions); - let entry_list = vec![e0.clone(); 1]; + let entries = vec![e0.clone(); 1]; let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); - process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); - let entries = reconstruct_entries_from_blobs(&blob_q); + entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!(entry_list, entries); + assert_eq!(reconstruct_entries_from_blobs(&blob_q), entries); } #[test] diff --git a/src/lib.rs b/src/lib.rs index 184da6fdee3ca5..7ce12c24444746 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod entry; pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; +pub mod fetch_stage; pub mod hash; pub mod ledger; pub mod logger; diff --git a/src/server.rs b/src/server.rs index ab4f5be8bb1587..5c28a6282f03ed 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,15 +1,17 @@ //! The `server` module hosts all the server microservices. use bank::Bank; -use crdt::ReplicatedData; +use crdt::{Crdt, ReplicatedData}; use hash::Hash; +use packet; use rpu::Rpu; use std::io::Write; use std::net::UdpSocket; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use std::time::Duration; +use streamer; use tpu::Tpu; use tvu::Tvu; @@ -35,18 +37,34 @@ impl Server { let mut thread_hdls = vec![]; let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); + + let blob_recycler = packet::BlobRecycler::default(); let tpu = Tpu::new( bank.clone(), start_hash, tick_duration, - me, transactions_socket, - broadcast_socket, - gossip_socket, + blob_recycler.clone(), exit.clone(), writer, ); thread_hdls.extend(tpu.thread_hdls); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip_socket, exit.clone()); + + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + tpu.blob_receiver, + ); + thread_hdls.extend(vec![t_gossip, t_listen, t_broadcast]); + Server { thread_hdls } } pub fn new_validator( diff --git a/src/tpu.rs b/src/tpu.rs index b813bd3275bc05..9737f27fdfbbd0 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,22 +3,22 @@ use bank::Bank; use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; +use fetch_stage::FetchStage; use hash::Hash; -use packet; +use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; -use streamer; +use streamer::BlobReceiver; use write_stage::WriteStage; pub struct Tpu { + pub blob_receiver: BlobReceiver, pub thread_hdls: Vec>, } @@ -27,25 +27,18 @@ impl Tpu { bank: Arc, start_hash: Hash, tick_duration: Option, - me: ReplicatedData, transactions_socket: UdpSocket, - broadcast_socket: UdpSocket, - gossip: UdpSocket, + blob_recycler: BlobRecycler, exit: Arc, writer: W, ) -> Self { - let packet_recycler = packet::PacketRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_receiver = streamer::receiver( - transactions_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); + let packet_recycler = PacketRecycler::default(); - let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + let fetch_stage = + FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); + + let sigverify_stage = SigVerifyStage::new(exit.clone(), fetch_stage.packet_receiver); - let blob_recycler = packet::BlobRecycler::default(); let banking_stage = BankingStage::new( bank.clone(), exit.clone(), @@ -64,30 +57,16 @@ impl Tpu { record_stage.entry_receiver, ); - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - window, - blob_recycler.clone(), - write_stage.blob_receiver, - ); - let mut thread_hdls = vec![ - t_receiver, + fetch_stage.thread_hdl, banking_stage.thread_hdl, record_stage.thread_hdl, write_stage.thread_hdl, - t_gossip, - t_listen, - t_broadcast, ]; thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); - Tpu { thread_hdls } + Tpu { + blob_receiver: write_stage.blob_receiver, + thread_hdls, + } } }