diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 158614b32d7963..652f2569f8fd43 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -659,7 +659,10 @@ impl BankingStage { } let (decision, make_decision_time) = measure!(decision_maker.make_consume_or_forward_decision()); - let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start()); + let metrics_action = slot_metrics_tracker.check_leader_slot_boundary( + decision.bank_start(), + Some(unprocessed_transaction_storage), + ); slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us()); match decision { diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index d3a53aa42e91b8..7744a399e565bc 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -212,6 +212,8 @@ impl ConsumeWorkerMetrics { retryable_transaction_indexes, execute_and_commit_timings, error_counters, + min_prioritization_fees, + max_prioritization_fees, .. }: &ExecuteAndCommitTransactionsOutput, ) { @@ -227,7 +229,20 @@ impl ConsumeWorkerMetrics { self.count_metrics .retryable_transaction_count .fetch_add(retryable_transaction_indexes.len(), Ordering::Relaxed); - + let min_prioritization_fees = self + .count_metrics + .min_prioritization_fees + .fetch_min(*min_prioritization_fees, Ordering::Relaxed); + let max_prioritization_fees = self + .count_metrics + .max_prioritization_fees + .fetch_max(*max_prioritization_fees, Ordering::Relaxed); + self.count_metrics + .min_prioritization_fees + .swap(min_prioritization_fees, Ordering::Relaxed); + self.count_metrics + .max_prioritization_fees + .swap(max_prioritization_fees, Ordering::Relaxed); self.update_on_execute_and_commit_timings(execute_and_commit_timings); self.update_on_error_counters(error_counters); } @@ -368,7 +383,6 @@ impl ConsumeWorkerMetrics { } } -#[derive(Default)] struct ConsumeWorkerCountMetrics { transactions_attempted_execution_count: AtomicUsize, executed_transactions_count: AtomicUsize, @@ -376,6 +390,23 @@ struct ConsumeWorkerCountMetrics { retryable_transaction_count: AtomicUsize, retryable_expired_bank_count: AtomicUsize, cost_model_throttled_transactions_count: AtomicUsize, + min_prioritization_fees: AtomicU64, + max_prioritization_fees: AtomicU64, +} + +impl Default for ConsumeWorkerCountMetrics { + fn default() -> Self { + Self { + transactions_attempted_execution_count: AtomicUsize::default(), + executed_transactions_count: AtomicUsize::default(), + executed_with_successful_result_count: AtomicUsize::default(), + retryable_transaction_count: AtomicUsize::default(), + retryable_expired_bank_count: AtomicUsize::default(), + cost_model_throttled_transactions_count: AtomicUsize::default(), + min_prioritization_fees: AtomicU64::new(u64::MAX), + max_prioritization_fees: AtomicU64::default(), + } + } } impl ConsumeWorkerCountMetrics { @@ -416,6 +447,17 @@ impl ConsumeWorkerCountMetrics { .swap(0, Ordering::Relaxed), i64 ), + ( + "min_prioritization_fees", + self.min_prioritization_fees + .swap(u64::MAX, Ordering::Relaxed), + i64 + ), + ( + "max_prioritization_fees", + self.max_prioritization_fees.swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 64b68889747633..ed586b4f08729e 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -26,6 +26,7 @@ use { accounts::validate_fee_payer, bank::{Bank, LoadAndExecuteTransactionsOutput}, transaction_batch::TransactionBatch, + transaction_priority_details::GetTransactionPriorityDetails, }, solana_sdk::{ clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, @@ -69,6 +70,8 @@ pub struct ExecuteAndCommitTransactionsOutput { pub commit_transactions_result: Result, PohRecorderError>, pub(crate) execute_and_commit_timings: LeaderExecuteAndCommitTimings, pub(crate) error_counters: TransactionErrorMetrics, + pub(crate) min_prioritization_fees: u64, + pub(crate) max_prioritization_fees: u64, } pub struct Consumer { @@ -291,6 +294,8 @@ impl Consumer { let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut total_error_counters = TransactionErrorMetrics::default(); let mut reached_max_poh_height = false; + let mut overall_min_prioritization_fees: u64 = u64::MAX; + let mut overall_max_prioritization_fees: u64 = 0; while chunk_start != transactions.len() { let chunk_end = std::cmp::min( transactions.len(), @@ -321,6 +326,8 @@ impl Consumer { commit_transactions_result: new_commit_transactions_result, execute_and_commit_timings: new_execute_and_commit_timings, error_counters: new_error_counters, + min_prioritization_fees, + max_prioritization_fees, .. } = execute_and_commit_transactions_output; @@ -330,6 +337,10 @@ impl Consumer { total_transactions_attempted_execution_count, new_transactions_attempted_execution_count ); + overall_min_prioritization_fees = + std::cmp::min(overall_min_prioritization_fees, min_prioritization_fees); + overall_max_prioritization_fees = + std::cmp::min(overall_max_prioritization_fees, max_prioritization_fees); trace!( "process_transactions result: {:?}", @@ -390,6 +401,8 @@ impl Consumer { cost_model_us: total_cost_model_us, execute_and_commit_timings: total_execute_and_commit_timings, error_counters: total_error_counters, + min_prioritization_fees: overall_min_prioritization_fees, + max_prioritization_fees: overall_max_prioritization_fees, } } @@ -565,6 +578,19 @@ impl Consumer { }); execute_and_commit_timings.collect_balances_us = collect_balances_us; + let min_max = batch + .sanitized_transactions() + .iter() + .filter_map(|transaction| { + let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set + transaction + .get_transaction_priority_details(round_compute_unit_price_enabled) + .map(|details| details.priority) + }) + .minmax(); + let (min_prioritization_fees, max_prioritization_fees) = + min_max.into_option().unwrap_or_default(); + let (load_and_execute_transactions_output, load_execute_us) = measure_us!(bank .load_and_execute_transactions( batch, @@ -646,6 +672,8 @@ impl Consumer { commit_transactions_result: Err(recorder_err), execute_and_commit_timings, error_counters, + min_prioritization_fees, + max_prioritization_fees, }; } @@ -701,6 +729,8 @@ impl Consumer { commit_transactions_result: Ok(commit_transaction_statuses), execute_and_commit_timings, error_counters, + min_prioritization_fees, + max_prioritization_fees, } } diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index 449ff7801991fa..1e250c5b69a17b 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -1,7 +1,9 @@ use { super::{ leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics}, - unprocessed_transaction_storage::InsertPacketBatchSummary, + unprocessed_transaction_storage::{ + InsertPacketBatchSummary, UnprocessedTransactionStorage, + }, }, solana_accounts_db::transaction_error_metrics::*, solana_poh::poh_recorder::BankStart, @@ -52,6 +54,53 @@ pub(crate) struct ProcessTransactionsSummary { // Breakdown of all the transaction errors from transactions passed for execution pub error_counters: TransactionErrorMetrics, + + pub min_prioritization_fees: u64, + pub max_prioritization_fees: u64, +} + +// Metrics describing prioritization fee information for each transaction storage before processing transactions +#[derive(Debug, Default)] +struct LeaderPrioritizationFeesMetrics { + // minimum prioritization fees in the MinMaxHeap + min_prioritization_fees_per_cu: u64, + // maximum prioritization fees in the MinMaxHeap + max_prioritization_fees_per_cu: u64, +} + +impl LeaderPrioritizationFeesMetrics { + fn new(unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>) -> Self { + if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage { + Self { + min_prioritization_fees_per_cu: unprocessed_transaction_storage + .get_min_priority() + .unwrap_or_default(), + max_prioritization_fees_per_cu: unprocessed_transaction_storage + .get_max_priority() + .unwrap_or_default(), + } + } else { + Self::default() + } + } + + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_prioritization_fees_info", + ("id", id, i64), + ("slot", slot, i64), + ( + "min_prioritization_fees_per_cu", + self.min_prioritization_fees_per_cu, + i64 + ), + ( + "max_prioritization_fees_per_cu", + self.max_prioritization_fees_per_cu, + i64 + ) + ); + } } // Metrics describing packets ingested/processed in various parts of BankingStage during this @@ -138,6 +187,11 @@ struct LeaderSlotPacketCountMetrics { // total number of forwardable batches that were attempted for forwarding. A forwardable batch // is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs` forwardable_batches_count: u64, + + // min prioritization fees for scheduled transactions + min_prioritization_fees: u64, + // max prioritization fees for scheduled transactions + max_prioritization_fees: u64, } impl LeaderSlotPacketCountMetrics { @@ -255,6 +309,16 @@ impl LeaderSlotPacketCountMetrics { self.end_of_slot_unprocessed_buffer_len as i64, i64 ), + ( + "min_prioritization_fees", + self.min_prioritization_fees as i64, + i64 + ), + ( + "max_prioritization_fees", + self.max_prioritization_fees as i64, + i64 + ), ); } } @@ -277,12 +341,19 @@ pub(crate) struct LeaderSlotMetrics { timing_metrics: LeaderSlotTimingMetrics, + prioritization_fees_metric: LeaderPrioritizationFeesMetrics, + // Used by tests to check if the `self.report()` method was called is_reported: bool, } impl LeaderSlotMetrics { - pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self { + pub(crate) fn new( + id: u32, + slot: Slot, + bank_creation_time: &Instant, + unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>, + ) -> Self { Self { id, slot, @@ -290,6 +361,9 @@ impl LeaderSlotMetrics { transaction_error_metrics: TransactionErrorMetrics::new(), vote_packet_count_metrics: VotePacketCountMetrics::new(), timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), + prioritization_fees_metric: LeaderPrioritizationFeesMetrics::new( + unprocessed_transaction_storage, + ), is_reported: false, } } @@ -301,6 +375,7 @@ impl LeaderSlotMetrics { self.transaction_error_metrics.report(self.id, self.slot); self.packet_count_metrics.report(self.id, self.slot); self.vote_packet_count_metrics.report(self.id, self.slot); + self.prioritization_fees_metric.report(self.id, self.slot); } /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None @@ -372,6 +447,7 @@ impl LeaderSlotMetricsTracker { pub(crate) fn check_leader_slot_boundary( &mut self, bank_start: Option<&BankStart>, + unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>, ) -> MetricsTrackerAction { match (self.leader_slot_metrics.as_mut(), bank_start) { (None, None) => MetricsTrackerAction::Noop, @@ -387,6 +463,7 @@ impl LeaderSlotMetricsTracker { self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, + unprocessed_transaction_storage, ))) } @@ -398,6 +475,7 @@ impl LeaderSlotMetricsTracker { self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, + unprocessed_transaction_storage, ))) } else { MetricsTrackerAction::Noop @@ -449,6 +527,8 @@ impl LeaderSlotMetricsTracker { cost_model_us, ref execute_and_commit_timings, error_counters, + min_prioritization_fees, + max_prioritization_fees, .. } = process_transactions_summary; @@ -525,6 +605,23 @@ impl LeaderSlotMetricsTracker { *cost_model_us ); + leader_slot_metrics + .packet_count_metrics + .min_prioritization_fees = std::cmp::min( + leader_slot_metrics + .packet_count_metrics + .min_prioritization_fees, + *min_prioritization_fees, + ); + leader_slot_metrics + .packet_count_metrics + .max_prioritization_fees = std::cmp::min( + leader_slot_metrics + .packet_count_metrics + .max_prioritization_fees, + *max_prioritization_fees, + ); + leader_slot_metrics .timing_metrics .execute_and_commit_timings @@ -896,7 +993,7 @@ mod tests { .. } = setup_test_slot_boundary_banks(); // Test that with no bank being tracked, and no new bank being tracked, nothing is reported - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -916,8 +1013,8 @@ mod tests { // Test case where the thread has not detected a leader bank, and now sees a leader bank. // Metrics should not be reported because leader slot has not ended assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); - let action = - leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert_eq!( mem::discriminant(&MetricsTrackerAction::NewTracker(None)), mem::discriminant(&action) @@ -941,12 +1038,12 @@ mod tests { { // Setup first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -959,7 +1056,7 @@ mod tests { } { // Assert no-op if still no new bank - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -981,13 +1078,13 @@ mod tests { { // Setup with first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert nop-op if same bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -996,7 +1093,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -1025,13 +1122,13 @@ mod tests { { // Setup with first_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&next_poh_recorder_bank), None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) @@ -1044,7 +1141,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) @@ -1072,13 +1169,13 @@ mod tests { { // Setup with next_bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&next_poh_recorder_bank), None); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + .check_leader_slot_boundary(Some(&first_poh_recorder_bank), None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) @@ -1091,7 +1188,7 @@ mod tests { } { // Assert reporting if slot has ended - let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), mem::discriminant(&action) diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 225ff6a53e18c5..b812b9bd4920c8 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -17,6 +17,7 @@ use { TOTAL_BUFFERED_PACKETS, }, crossbeam_channel::RecvTimeoutError, + itertools::MinMaxResult, solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics, solana_cost_model::cost_model::CostModel, solana_measure::measure_us, @@ -95,10 +96,11 @@ impl SchedulerController { if !self.receive_and_buffer_packets(&decision) { break; } - // Report metrics only if there is data. // Reset intervals when appropriate, regardless of report. let should_report = self.count_metrics.has_data(); + self.count_metrics + .update_prioritization_stats(self.container.get_min_max_prioritization_fees()); self.count_metrics.maybe_report_and_reset(should_report); self.timing_metrics.maybe_report_and_reset(should_report); self.worker_metrics @@ -419,6 +421,10 @@ struct SchedulerCountMetrics { num_dropped_on_age_and_status: usize, /// Number of transactions that were dropped due to exceeded capacity. num_dropped_on_capacity: usize, + /// Min prioritization fees in the transaction container + min_prioritization_fees: u64, + /// Max prioritization fees in the transaction container + max_prioritization_fees: u64, } impl SchedulerCountMetrics { @@ -468,7 +474,17 @@ impl SchedulerCountMetrics { self.num_dropped_on_age_and_status, i64 ), - ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64) + ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), + ( + "min_prioritization_fees", + self.get_min_prioritization_fees(), + i64 + ), + ( + "max_prioritization_fees", + self.get_max_prioritization_fees(), + i64 + ) ); } @@ -504,6 +520,38 @@ impl SchedulerCountMetrics { self.num_dropped_on_clear = 0; self.num_dropped_on_age_and_status = 0; self.num_dropped_on_capacity = 0; + self.min_prioritization_fees = u64::MAX; + self.max_prioritization_fees = 0; + } + + pub fn update_prioritization_stats(&mut self, min_max_fees: MinMaxResult) { + // update min/max priotization fees + match min_max_fees { + itertools::MinMaxResult::NoElements => { + // do nothing + } + itertools::MinMaxResult::OneElement(e) => { + self.min_prioritization_fees = e; + self.max_prioritization_fees = e; + } + itertools::MinMaxResult::MinMax(min, max) => { + self.min_prioritization_fees = min; + self.max_prioritization_fees = max; + } + } + } + + pub fn get_min_prioritization_fees(&self) -> u64 { + // to avoid getting u64::max recorded by metrics / in case of edge cases + if self.min_prioritization_fees != u64::MAX { + self.min_prioritization_fees + } else { + 0 + } + } + + pub fn get_max_prioritization_fees(&self) -> u64 { + self.max_prioritization_fees } } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index d7d79cb21b7c32..f0688dee67bb5f 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -4,6 +4,7 @@ use { transaction_state::{SanitizedTransactionTTL, TransactionState}, }, crate::banking_stage::scheduler_messages::TransactionId, + itertools::MinMaxResult, min_max_heap::MinMaxHeap, solana_cost_model::transaction_cost::TransactionCost, solana_runtime::transaction_priority_details::TransactionPriorityDetails, @@ -149,6 +150,16 @@ impl TransactionStateContainer { .remove(id) .expect("transaction must exist"); } + + pub(crate) fn get_min_max_prioritization_fees(&self) -> MinMaxResult { + match self.priority_queue.peek_min() { + Some(min) => match self.priority_queue.peek_max() { + Some(max) => MinMaxResult::MinMax(min.priority, max.priority), + None => MinMaxResult::OneElement(min.priority), + }, + None => MinMaxResult::NoElements, + } + } } #[cfg(test)] diff --git a/core/src/banking_stage/unprocessed_packet_batches.rs b/core/src/banking_stage/unprocessed_packet_batches.rs index ff323ef25f18ee..9341fd4a54ec61 100644 --- a/core/src/banking_stage/unprocessed_packet_batches.rs +++ b/core/src/banking_stage/unprocessed_packet_batches.rs @@ -193,6 +193,14 @@ impl UnprocessedPacketBatches { self.packet_priority_queue.is_empty() } + pub fn get_min_priority(&self) -> Option { + self.packet_priority_queue.peek_min().map(|x| x.priority()) + } + + pub fn get_max_priority(&self) -> Option { + self.packet_priority_queue.peek_max().map(|x| x.priority()) + } + fn push_internal(&mut self, deserialized_packet: DeserializedPacket) { // Push into the priority queue self.packet_priority_queue diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index f8d99c77900c51..d5103b7df370a0 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -282,6 +282,24 @@ impl UnprocessedTransactionStorage { } } + pub fn get_min_priority(&self) -> Option { + match self { + Self::VoteStorage(_) => None, + Self::LocalTransactionStorage(transaction_storage) => { + transaction_storage.get_min_priority() + } + } + } + + pub fn get_max_priority(&self) -> Option { + match self { + Self::VoteStorage(_) => None, + Self::LocalTransactionStorage(transaction_storage) => { + transaction_storage.get_max_priority() + } + } + } + /// Returns the maximum number of packets a receive should accept pub fn max_receive_size(&self) -> usize { match self { @@ -529,6 +547,14 @@ impl ThreadLocalUnprocessedPackets { self.unprocessed_packet_batches.len() } + pub fn get_min_priority(&self) -> Option { + self.unprocessed_packet_batches.get_min_priority() + } + + pub fn get_max_priority(&self) -> Option { + self.unprocessed_packet_batches.get_max_priority() + } + fn max_receive_size(&self) -> usize { self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len() }