-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Add push and get methods for RestartLastVotedForkSlots #33613
Changes from 1 commit
dcccabb
75018a2
ad641b1
0924388
ead61eb
c1bf8bf
2b181c7
513116e
6491d5f
d78a00e
03056b6
e788b3e
756d9f2
7920971
73cd8b7
07f3ed2
f4df7f3
6155485
6107deb
def82a0
4407db4
dd8181c
692f78e
9bf13d6
eb15284
f8e2437
5f54d95
4a66754
789d3d1
1ded7a8
be92ca6
3352222
60e9a3c
99c19f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ use { | |
}, | ||
crds_value::{ | ||
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, | ||
NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK, | ||
NodeInstance, RestartLastVotedForkSlots, SnapshotHashes, Version, Vote, MAX_WALLCLOCK, | ||
}, | ||
duplicate_shred::DuplicateShred, | ||
epoch_slots::EpochSlots, | ||
|
@@ -965,6 +965,21 @@ impl ClusterInfo { | |
} | ||
} | ||
|
||
pub fn push_restart_last_voted_fork_slots(&self, update: &[Slot], last_vote_bankhash: Hash) { | ||
let now = timestamp(); | ||
let mut last_voted_fork_slots = RestartLastVotedForkSlots::new( | ||
self.id(), | ||
now, | ||
last_vote_bankhash, | ||
self.my_shred_version(), | ||
); | ||
last_voted_fork_slots.fill(update); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this 2 stage initialization? also, the update logic is quite different from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we only have one packet per validator, yes the value with newer timestamp should always replace the old one. The 2 stage initialization was still there because we need self.max_compressed_slot_size(), which can't be called in a constructor. But maybe we can approximate assuming header etc is around 100+? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this initialization, Also looking at the code, when there are too many slots, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, actually I don't care too much about time complexity here, because it is only called once on start. So I can try to pack in as many slots as possible into one packet. Changed. |
||
self.push_message(CrdsValue::new_signed( | ||
CrdsData::RestartLastVotedForkSlots(last_voted_fork_slots), | ||
&self.keypair(), | ||
)); | ||
} | ||
|
||
fn time_gossip_read_lock<'a>( | ||
&'a self, | ||
label: &'static str, | ||
|
@@ -1217,6 +1232,27 @@ impl ClusterInfo { | |
.collect() | ||
} | ||
|
||
pub fn get_restart_last_voted_fork_slots( | ||
&self, | ||
cursor: &mut Cursor, | ||
) -> Vec<RestartLastVotedForkSlots> { | ||
let self_shred_version = self.my_shred_version(); | ||
let gossip_crds = self.gossip.crds.read().unwrap(); | ||
gossip_crds | ||
.get_entries(cursor) | ||
.filter_map(|entry| match &entry.value.data { | ||
CrdsData::RestartLastVotedForkSlots(slots) => { | ||
if slots.shred_version == self_shred_version { | ||
Some(slots.clone()) | ||
} else { | ||
None | ||
} | ||
} | ||
_ => None, | ||
}) | ||
wen-coding marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.collect() | ||
} | ||
|
||
/// Returns duplicate-shreds inserted since the given cursor. | ||
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> { | ||
let gossip_crds = self.gossip.crds.read().unwrap(); | ||
|
@@ -3162,6 +3198,7 @@ mod tests { | |
crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS, | ||
crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}, | ||
duplicate_shred::{self, tests::new_rand_shred, MAX_DUPLICATE_SHREDS}, | ||
epoch_slots::MAX_SLOTS_PER_ENTRY, | ||
}, | ||
itertools::izip, | ||
solana_ledger::shred::Shredder, | ||
|
@@ -4559,4 +4596,70 @@ mod tests { | |
assert_eq!(shred_data.chunk_index() as usize, i); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_push_restart_last_voted_fork_slots() { | ||
let keypair = Arc::new(Keypair::new()); | ||
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); | ||
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert!(slots.is_empty()); | ||
let mut update: Vec<Slot> = vec![0]; | ||
for i in 0..81 { | ||
for j in 0..1000 { | ||
update.push(i * 1050 + j); | ||
} | ||
} | ||
cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default()); | ||
cluster_info.flush_push_queue(); | ||
|
||
let mut cursor = Cursor::default(); | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); | ||
assert_eq!(slots.len(), 1); | ||
let retrieved_slots = slots[0].to_slots(0); | ||
assert_eq!(retrieved_slots.len(), MAX_SLOTS_PER_ENTRY); | ||
assert_eq!(retrieved_slots[0], 67816); | ||
assert_eq!(retrieved_slots.last(), Some(84999).as_ref()); | ||
|
||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); | ||
assert!(slots.is_empty()); | ||
|
||
// Test with different shred versions. | ||
let mut rng = rand::thread_rng(); | ||
let node_pubkey = Pubkey::new_unique(); | ||
let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); | ||
node.set_shred_version(42); | ||
let mut slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey)); | ||
slots.shred_version = 42; | ||
let entries = vec![ | ||
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), | ||
CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(slots)), | ||
]; | ||
{ | ||
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); | ||
for entry in entries { | ||
assert!(gossip_crds | ||
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) | ||
.is_ok()); | ||
} | ||
} | ||
// Should exclude other node's last-voted-fork-slot because of different | ||
// shred-version. | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert_eq!(slots.len(), 1); | ||
assert_eq!(slots[0].from, cluster_info.id()); | ||
|
||
// Match shred versions. | ||
{ | ||
let mut node = cluster_info.my_contact_info.write().unwrap(); | ||
node.set_shred_version(42); | ||
} | ||
cluster_info.push_restart_last_voted_fork_slots(&update, Hash::default()); | ||
cluster_info.flush_push_queue(); | ||
// Should now include both slots. | ||
let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); | ||
assert_eq!(slots.len(), 2); | ||
assert_eq!(slots[0].from, node_pubkey); | ||
assert_eq!(slots[1].from, cluster_info.id()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update
is pretty ambiguous here.can we name the arg
fork
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.