Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Initialize broadcaster to entry count #467

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ pub struct Bank {
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
transaction_count: AtomicUsize,

/// The number of Entries the bank has processed without error since start
/// of the ledger, i.e. poor-man's network synchronization
/// TODO: upgrade to U64 when stable?
entry_count: AtomicUsize,
}

impl Bank {
Expand All @@ -105,7 +100,6 @@ impl Bank {
time_sources: RwLock::new(HashSet::new()),
last_time: RwLock::new(Utc.timestamp(0, 0)),
transaction_count: AtomicUsize::new(0),
entry_count: AtomicUsize::new(0),
};
bank.apply_payment(deposit, &mut bank.balances.write().unwrap());
bank
Expand Down Expand Up @@ -302,12 +296,13 @@ impl Bank {
}

/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<usize>
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
let mut entry_count = 0;
for entry in entries {
self.entry_count.fetch_add(1, Ordering::Relaxed);
entry_count += 1;

if !entry.transactions.is_empty() {
for result in self.process_transactions(entry.transactions) {
Expand All @@ -321,7 +316,7 @@ impl Bank {
self.register_entry_id(&entry.id);
}
}
Ok(self.entry_count())
Ok(entry_count)
}

/// Process a Witness Signature. Any payment plans waiting on this signature
Expand Down Expand Up @@ -435,9 +430,6 @@ impl Bank {
pub fn transaction_count(&self) -> usize {
self.transaction_count.load(Ordering::Relaxed)
}
pub fn entry_count(&self) -> usize {
self.entry_count.load(Ordering::Relaxed)
}
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions src/bin/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ fn main() {
bank.register_entry_id(&entry0.id);
bank.register_entry_id(&entry1.id);

// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
eprintln!("processing entries...");
let num_entries = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", num_entries);
let entry_height = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", entry_height);

eprintln!("creating networking stack...");

Expand Down Expand Up @@ -135,6 +137,7 @@ fn main() {
let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr);
let s = Server::new_validator(
bank,
entry_height,
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
Expand All @@ -160,6 +163,7 @@ fn main() {

let server = Server::new_leader(
bank,
entry_height,
//Some(Duration::from_millis(1000)),
None,
repl_data.clone(),
Expand Down
5 changes: 4 additions & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,10 @@ impl Crdt {
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
info!(
"failed RequestWindowIndex {} {} {}",
ix, pos, from.repair_addr
);
}

None
Expand Down
1 change: 1 addition & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down
5 changes: 5 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl Server {
/// ```
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
Expand Down Expand Up @@ -89,6 +90,7 @@ impl Server {
exit.clone(),
crdt,
window,
entry_height,
blob_recycler.clone(),
tpu.blob_receiver,
);
Expand Down Expand Up @@ -128,6 +130,7 @@ impl Server {
/// ```
pub fn new_validator(
bank: Bank,
entry_height: u64,
me: ReplicatedData,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
Expand Down Expand Up @@ -159,6 +162,7 @@ impl Server {

let tvu = Tvu::new(
bank.clone(),
entry_height,
crdt.clone(),
window.clone(),
replicate_socket,
Expand Down Expand Up @@ -187,6 +191,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let v = Server::new_validator(
bank,
0,
tn.data.clone(),
tn.sockets.requests,
tn.sockets.respond,
Expand Down
69 changes: 36 additions & 33 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;

pub const WINDOW_SIZE: usize = 2 * 1024;
pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
Expand Down Expand Up @@ -148,16 +148,16 @@ pub fn blob_receiver(
fn find_next_missing(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if *received <= *consumed {
return Err(Error::GenericError);
}
let window = locked_window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| {
let i = pix % WINDOW_SIZE;
let i = (pix % WINDOW_SIZE) as usize;
if let &None = &window[i] {
let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val {
Expand All @@ -174,18 +174,18 @@ fn repair_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
last: &mut usize,
last: &mut u64,
times: &mut usize,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<()> {
#[cfg(feature = "erasure")]
{
if erasure::recover(
_recycler,
&mut locked_window.write().unwrap(),
*consumed,
*received,
*consumed as usize,
*received as usize,
).is_err()
{
trace!("erasure::recover failed");
Expand Down Expand Up @@ -217,8 +217,8 @@ fn recv_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
retransmit: &BlobSender,
Expand Down Expand Up @@ -273,7 +273,7 @@ fn recv_window(
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()? as usize, p.meta.size)
(p.get_index()?, p.meta.size)
};
if pix > *received {
*received = pix;
Expand All @@ -287,7 +287,7 @@ fn recv_window(
);
continue;
}
let w = pix % WINDOW_SIZE;
let w = (pix % WINDOW_SIZE) as usize;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
Expand All @@ -304,7 +304,7 @@ fn recv_window(
}
}
loop {
let k = *consumed % WINDOW_SIZE;
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() {
break;
Expand All @@ -330,19 +330,21 @@ fn recv_window(
} else {
#[cfg(feature = "erasure")]
{
let block_start = *consumed - (*consumed % erasure::NUM_CODED);
let coding_end = block_start + erasure::NUM_CODED;
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
let coding_end = block_start + erasure::NUM_CODED as u64;
// We've received all this block's data blobs, go and null out the window now
for j in block_start..*consumed {
if let Some(b) = mem::replace(&mut window[j % WINDOW_SIZE], None) {
if let Some(b) =
mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None)
{
recycler.recycle(b);
}
}
for j in *consumed..coding_end {
window[j % WINDOW_SIZE] = None;
window[(j % WINDOW_SIZE) as usize] = None;
}

*consumed += erasure::MAX_MISSING;
*consumed += erasure::MAX_MISSING as u64;
debug!(
"skipping processing coding blob k: {} consumed: {}",
k, *consumed
Expand All @@ -361,15 +363,15 @@ fn recv_window(
Ok(())
}

fn print_window(locked_window: &Window, consumed: usize) {
fn print_window(locked_window: &Window, consumed: u64) {
{
let buf: Vec<_> = locked_window
.read()
.unwrap()
.iter()
.enumerate()
.map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.is_none() {
"0"
Expand All @@ -391,25 +393,25 @@ fn print_window(locked_window: &Window, consumed: usize) {
}

pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE]))
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}

pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
retransmit: BlobSender,
entry_count: usize,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut consumed = entry_count;
let mut received = entry_count;
let mut last = entry_count;
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
loop {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -459,9 +461,9 @@ fn broadcast(

// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec());
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());

print_window(window, *receive_index as usize);
print_window(window, *receive_index);

for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
Expand All @@ -479,7 +481,7 @@ fn broadcast(
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos], None) {
trace!(
"popped {} at {}",
Expand All @@ -492,7 +494,7 @@ fn broadcast(
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
Expand Down Expand Up @@ -531,14 +533,15 @@ pub fn broadcaster(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = 0;
let mut receive_index = 0;
let mut transmit_index = entry_height;
let mut receive_index = entry_height;
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -824,11 +827,11 @@ mod test {
exit.clone(),
subs,
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
0,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(
Expand Down
2 changes: 2 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down Expand Up @@ -249,6 +250,7 @@ mod tests {

let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
Expand Down
Loading