Skip to content

Commit

Permalink
Fixing missing pubsub notification for programSubscribe and logsSubsc…
Browse files Browse the repository at this point in the history
…ribe (solana-labs#19092)

Not all rooted slots get OptimisticallyConfirmed notifications
The OptimisticallyConfirmed notifications can be out of order for slots: slot A and B with A < B can see notification for B first before A.
Summary of Changes

Changed OptimisticallyConfirmedBankTracker to send notifications for parent banks if they have not been notified yet. We use a new variable last_notified_slot to track that.

Tests:
With my validator running against testnet, before the fix, it was failing 75% of time, with the fix, it is passing consistently. Using the program mentioned in solana-labs#18587.
  • Loading branch information
lijunwangs authored and Tyera Eulberg committed Aug 21, 2021
1 parent b570b64 commit 59ad4a6
Show file tree
Hide file tree
Showing 3 changed files with 588 additions and 11 deletions.
125 changes: 115 additions & 10 deletions core/src/optimistically_confirmed_bank_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ impl OptimisticallyConfirmedBankTracker {
) -> Self {
let exit_ = exit.clone();
let mut pending_optimistically_confirmed_banks = HashSet::new();
let mut last_notified_confirmed_slot: Slot = 0;
let mut highest_confirmed_slot: Slot = 0;
let thread_hdl = Builder::new()
.name("solana-optimistic-bank-tracker".to_string())
.spawn(move || loop {
Expand All @@ -77,6 +79,8 @@ impl OptimisticallyConfirmedBankTracker {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
) {
break;
}
Expand All @@ -91,6 +95,8 @@ impl OptimisticallyConfirmedBankTracker {
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut last_notified_confirmed_slot: &mut Slot,
mut highest_confirmed_slot: &mut Slot,
) -> Result<(), RecvTimeoutError> {
let notification = receiver.recv_timeout(Duration::from_secs(1))?;
Self::process_notification(
Expand All @@ -99,31 +105,91 @@ impl OptimisticallyConfirmedBankTracker {
optimistically_confirmed_bank,
subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
Ok(())
}

fn notify_or_defer(
subscriptions: &Arc<RpcSubscriptions>,
bank_forks: &Arc<RwLock<BankForks>>,
bank: &Arc<Bank>,
last_notified_confirmed_slot: &mut Slot,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
if bank.is_frozen() {
if bank.slot() > *last_notified_confirmed_slot {
debug!(
"notify_or_defer notifying via notify_gossip_subscribers for slot {:?}",
bank.slot()
);
subscriptions.notify_gossip_subscribers(bank.slot());
*last_notified_confirmed_slot = bank.slot();
}
} else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() {
pending_optimistically_confirmed_banks.insert(bank.slot());
debug!("notify_or_defer defer notifying for slot {:?}", bank.slot());
}
}

fn notify_or_defer_confirmed_banks(
subscriptions: &Arc<RpcSubscriptions>,
bank_forks: &Arc<RwLock<BankForks>>,
bank: &Arc<Bank>,
slot_threshold: Slot,
mut last_notified_confirmed_slot: &mut Slot,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
for confirmed_bank in bank.clone().parents_inclusive().iter().rev() {
if confirmed_bank.slot() > slot_threshold {
debug!(
"Calling notify_or_defer for confirmed_bank {:?}",
confirmed_bank.slot()
);
Self::notify_or_defer(
subscriptions,
bank_forks,
confirmed_bank,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);
}
}
}

pub(crate) fn process_notification(
notification: BankNotification,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut last_notified_confirmed_slot: &mut Slot,
highest_confirmed_slot: &mut Slot,
) {
debug!("received bank notification: {:?}", notification);
match notification {
BankNotification::OptimisticallyConfirmed(slot) => {
if let Some(bank) = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
{
if let Some(bank) = bank_forks.read().unwrap().get(slot) {
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if bank.slot() > w_optimistically_confirmed_bank.bank.slot() {

if bank.slot() > w_optimistically_confirmed_bank.bank.slot() && bank.is_frozen()
{
w_optimistically_confirmed_bank.bank = bank.clone();
subscriptions.notify_gossip_subscribers(slot);
}

if slot > *highest_confirmed_slot {
Self::notify_or_defer_confirmed_banks(
subscriptions,
bank_forks,
bank,
*highest_confirmed_slot,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);

*highest_confirmed_slot = slot;
}
drop(w_optimistically_confirmed_bank);
} else if slot > bank_forks.read().unwrap().root_bank().slot() {
Expand Down Expand Up @@ -157,11 +223,24 @@ impl OptimisticallyConfirmedBankTracker {
}

if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
debug!(
"Calling notify_gossip_subscribers to send deferred notification {:?}",
frozen_slot
);

Self::notify_or_defer_confirmed_banks(
subscriptions,
bank_forks,
&bank,
*last_notified_confirmed_slot,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);

let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
subscriptions.notify_gossip_subscribers(frozen_slot);
}
drop(w_optimistically_confirmed_bank);
}
Expand Down Expand Up @@ -227,14 +306,19 @@ mod tests {

assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);

let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(highest_confirmed_slot, 2);

// Test max optimistically confirmed bank remains in the cache
OptimisticallyConfirmedBankTracker::process_notification(
Expand All @@ -243,8 +327,11 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(highest_confirmed_slot, 2);

// Test bank will only be cached when frozen
OptimisticallyConfirmedBankTracker::process_notification(
Expand All @@ -253,21 +340,30 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&3), true);
assert_eq!(highest_confirmed_slot, 3);

// Test bank will only be cached when frozen
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
bank3.freeze();

OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(highest_confirmed_slot, 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);

// Test higher root will be cached and clear pending_optimistically_confirmed_banks
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
Expand All @@ -279,10 +375,13 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), true);
assert_eq!(highest_confirmed_slot, 4);

let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone();
let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5);
Expand All @@ -294,10 +393,13 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&4), false);
assert_eq!(highest_confirmed_slot, 4);

// Banks <= root do not get added to pending list, even if not frozen
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
Expand All @@ -316,9 +418,12 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert_eq!(pending_optimistically_confirmed_banks.contains(&6), false);
assert_eq!(highest_confirmed_slot, 4);
}
}
10 changes: 10 additions & 0 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7364,13 +7364,17 @@ pub mod tests {
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 0);
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;

OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
Expand All @@ -7386,6 +7390,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
Expand All @@ -7401,6 +7407,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
Expand All @@ -7417,6 +7425,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
Expand Down
Loading

0 comments on commit 59ad4a6

Please sign in to comment.