diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 3ec47a3e1b07d3..bc7483ce1a487b 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -15,7 +15,10 @@ use { crossbeam_channel::{Receiver, Sender, TryRecvError}, itertools::izip, prio_graph::{AccessKind, PrioGraph}, - solana_sdk::{pubkey::Pubkey, slot_history::Slot, transaction::SanitizedTransaction}, + solana_sdk::{ + pubkey::Pubkey, saturating_add_assign, slot_history::Slot, + transaction::SanitizedTransaction, + }, std::collections::HashMap, }; @@ -43,7 +46,7 @@ impl PrioGraphScheduler { } /// Schedule transactions from the given `TransactionStateContainer` to be consumed by the - /// worker threads. Returns the number of transactions scheduled, or an error. + /// worker threads. Returns summary of scheduling, or an error. /// /// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions. /// This, combined with internal tracking of threads' in-flight transactions, allows @@ -52,7 +55,7 @@ impl PrioGraphScheduler { pub(crate) fn schedule( &mut self, container: &mut TransactionStateContainer, - ) -> Result { + ) -> Result { let num_threads = self.consume_work_senders.len(); let mut batches = Batches::new(num_threads); let mut chain_id_to_thread_index = HashMap::new(); @@ -77,8 +80,9 @@ impl PrioGraphScheduler { let mut unblock_this_batch = Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; - let mut num_scheduled = 0; - let mut num_sent = 0; + let mut num_scheduled: usize = 0; + let mut num_sent: usize = 0; + let mut num_unschedulable: usize = 0; while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. if prio_graph.is_empty() { @@ -109,6 +113,7 @@ impl PrioGraphScheduler { if !blocking_locks.check_locks(transaction.message()) { blocking_locks.take_locks(transaction.message()); unschedulable_ids.push(id); + saturating_add_assign!(num_unschedulable, 1); continue; } @@ -133,10 +138,11 @@ impl PrioGraphScheduler { ) else { blocking_locks.take_locks(transaction.message()); unschedulable_ids.push(id); + saturating_add_assign!(num_unschedulable, 1); continue; }; - num_scheduled += 1; + saturating_add_assign!(num_scheduled, 1); // Track the chain-id to thread-index mapping. chain_id_to_thread_index.insert(prio_graph.chain_id(&id), thread_id); @@ -154,11 +160,11 @@ impl PrioGraphScheduler { batches.transactions[thread_id].push(transaction); batches.ids[thread_id].push(id.id); batches.max_age_slots[thread_id].push(max_age_slot); - batches.total_cus[thread_id] += cu_limit; + saturating_add_assign!(batches.total_cus[thread_id], cu_limit); // If target batch size is reached, send only this batch. if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { - num_sent += self.send_batch(&mut batches, thread_id)?; + saturating_add_assign!(num_sent, self.send_batch(&mut batches, thread_id)?); } if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { @@ -167,7 +173,7 @@ impl PrioGraphScheduler { } // Send all non-empty batches - num_sent += self.send_batches(&mut batches)?; + saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); // Unblock all transactions that were blocked by the transactions that were just sent. for id in unblock_this_batch.drain(..) { @@ -176,7 +182,7 @@ impl PrioGraphScheduler { } // Send batches for any remaining transactions - num_sent += self.send_batches(&mut batches)?; + saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); // Push unschedulable ids back into the container for id in unschedulable_ids { @@ -193,7 +199,10 @@ impl PrioGraphScheduler { "number of scheduled and sent transactions must match" ); - Ok(num_scheduled) + Ok(SchedulingSummary { + num_scheduled, + num_unschedulable, + }) } /// Receive completed batches of transactions without blocking. @@ -202,15 +211,15 @@ impl PrioGraphScheduler { &mut self, container: &mut TransactionStateContainer, ) -> Result<(usize, usize), SchedulerError> { - let mut total_num_transactions = 0; - let mut total_num_retryable = 0; + let mut total_num_transactions: usize = 0; + let mut total_num_retryable: usize = 0; loop { let (num_transactions, num_retryable) = self.try_receive_completed(container)?; if num_transactions == 0 { break; } - total_num_transactions += num_transactions; - total_num_retryable += num_retryable; + saturating_add_assign!(total_num_transactions, num_transactions); + saturating_add_assign!(total_num_retryable, num_retryable); } Ok((total_num_transactions, total_num_retryable)) } @@ -377,6 +386,15 @@ impl PrioGraphScheduler { } } +/// Metrics from scheduling transactions. +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct SchedulingSummary { + /// Number of transactions scheduled. + pub num_scheduled: usize, + /// Number of transactions that were not scheduled due to conflicts. + pub num_unschedulable: usize, +} + struct Batches { ids: Vec>, transactions: Vec>, @@ -553,8 +571,9 @@ mod tests { (&Keypair::new(), &[Pubkey::new_unique()], 2, 2), ]); - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 2); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]); } @@ -567,8 +586,9 @@ mod tests { (&Keypair::new(), &[pubkey], 1, 2), ]); - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 2); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); assert_eq!( collect_work(&work_receivers[0]).1, vec![txids!([1]), txids!([0])] @@ -584,8 +604,12 @@ mod tests { ); // expect 4 full batches to be scheduled - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 4 * TARGET_NUM_TRANSACTIONS_PER_BATCH); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!( + scheduling_summary.num_scheduled, + 4 * TARGET_NUM_TRANSACTIONS_PER_BATCH + ); + assert_eq!(scheduling_summary.num_unschedulable, 0); let thread0_work_counts: Vec<_> = work_receivers[0] .try_iter() @@ -600,8 +624,9 @@ mod tests { let mut container = create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i))); - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 4); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 4); + assert_eq!(scheduling_summary.num_unschedulable, 0); assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]); assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]); } @@ -631,8 +656,9 @@ mod tests { // fact they eventually join means that the scheduler will schedule them // onto the same thread to avoid causing [4], which conflicts with both // chains, to be un-schedulable. - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 5); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 5); + assert_eq!(scheduling_summary.num_unschedulable, 0); assert_eq!( collect_work(&work_receivers[0]).1, [txids!([0, 2]), txids!([1, 3]), txids!([4])] @@ -671,15 +697,17 @@ mod tests { // Because the look-ahead window is shortened to a size of 4, the scheduler does // not have knowledge of the joining at transaction [4] until after [0] and [1] // have been scheduled. - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 4); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 4); + assert_eq!(scheduling_summary.num_unschedulable, 2); let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); assert_eq!(thread_0_ids, [txids!([0, 2])]); assert_eq!(collect_work(&work_receivers[1]).1, [txids!([1, 3])]); // Cannot schedule even on next pass because of lock conflicts - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 0); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 0); + assert_eq!(scheduling_summary.num_unschedulable, 2); // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 finished_work_sender @@ -689,8 +717,9 @@ mod tests { }) .unwrap(); scheduler.receive_completed(&mut container).unwrap(); - let num_scheduled = scheduler.schedule(&mut container).unwrap(); - assert_eq!(num_scheduled, 2); + let scheduling_summary = scheduler.schedule(&mut container).unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); assert_eq!( collect_work(&work_receivers[1]).1, diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 8c92f235b5a741..5b7b69a89b1712 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -112,9 +112,16 @@ impl SchedulerController { ) -> Result<(), SchedulerError> { match decision { BufferedPacketsDecision::Consume(_bank_start) => { - let (num_scheduled, schedule_time_us) = + let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule(&mut self.container)?); - saturating_add_assign!(self.count_metrics.num_scheduled, num_scheduled); + saturating_add_assign!( + self.count_metrics.num_scheduled, + scheduling_summary.num_scheduled + ); + saturating_add_assign!( + self.count_metrics.num_unschedulable, + scheduling_summary.num_unschedulable + ); saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us); } BufferedPacketsDecision::Forward => { @@ -253,6 +260,8 @@ struct SchedulerCountMetrics { /// Number of transactions scheduled. num_scheduled: usize, + /// Number of transactions that were unschedulable. + num_unschedulable: usize, /// Number of completed transactions received from workers. num_finished: usize, /// Number of transactions that were retryable. @@ -287,6 +296,7 @@ impl SchedulerCountMetrics { ("num_received", self.num_received, i64), ("num_buffered", self.num_buffered, i64), ("num_scheduled", self.num_scheduled, i64), + ("num_unschedulable", self.num_unschedulable, i64), ("num_finished", self.num_finished, i64), ("num_retryable", self.num_retryable, i64), ("num_dropped_on_receive", self.num_dropped_on_receive, i64), @@ -309,6 +319,7 @@ impl SchedulerCountMetrics { self.num_received != 0 || self.num_buffered != 0 || self.num_scheduled != 0 + || self.num_unschedulable != 0 || self.num_finished != 0 || self.num_retryable != 0 || self.num_dropped_on_receive != 0 @@ -322,6 +333,7 @@ impl SchedulerCountMetrics { self.num_received = 0; self.num_buffered = 0; self.num_scheduled = 0; + self.num_unschedulable = 0; self.num_finished = 0; self.num_retryable = 0; self.num_dropped_on_receive = 0;