From a8868ab28fc7fb5bbfac86ceb8427529bb139453 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 5 Jan 2024 18:23:33 -0800 Subject: [PATCH] output txn signature for debug purpose trying txn mask matching output txn to figure out why txn is not exactly matched Use 62 and 61 portion track fetch performance using random txn mask track sigverify performance using random txn mask track banking stage performance using random txn mask adding missing cargo lock file add debug messages Revert "add debug messages" This reverts commit 96aead5cbc4acb5b2fc9d8a37fcd506c73ddf552. fixed some clippy issues check-crate issue Fix a clippy issue Fix a clippy issue debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points debug why txns in banking stage shows fewer performance tracking points get higher PPS for testing purpose more debug messages on why txn is skipped display if tracer packet in log add debug before calling processing_function debug at the initial of banking stage track if a txn is forwarded dependency order missing cargo file clean up debug messages Do not use TRACER_PACKET, use its own bit rename some functions addressed some comments from Trent Update core/src/banking_stage/immutable_deserialized_packet.rs Co-authored-by: Trent Nelson addressed some comments from Trent Do not use binary_search, do simple compare in one loop --- Cargo.lock | 16 ++++++ Cargo.toml | 2 + core/Cargo.toml | 1 + core/src/banking_stage/consumer.rs | 27 ++++++++++ .../immutable_deserialized_packet.rs | 13 ++++- .../unprocessed_transaction_storage.rs | 1 + core/src/sigverify_stage.rs | 29 +++++++++- programs/sbf/Cargo.lock | 16 ++++++ sdk/src/packet.rs | 13 +++++ sdk/src/transaction/versioned/sanitized.rs | 4 ++ streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 42 ++++++++++++++- transaction-metrics-tracker/Cargo.toml | 25 +++++++++ transaction-metrics-tracker/src/lib.rs | 54 +++++++++++++++++++ 14 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 transaction-metrics-tracker/Cargo.toml create mode 100644 transaction-metrics-tracker/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 81d6aa64125445..91f8f0d0bb5c01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5828,6 +5828,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -7193,6 +7194,7 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -7359,6 +7361,20 @@ dependencies = [ "solana-version", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 804e9ba19077da..0fab5a1348c228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,6 +106,7 @@ members = [ "tokens", "tpu-client", "transaction-dos", + "transaction-metrics-tracker", "transaction-status", "turbine", "udp-client", @@ -378,6 +379,7 @@ solana-system-program = { path = "programs/system", version = "=1.19.0" } solana-test-validator = { path = "test-validator", version = "=1.19.0" } solana-thin-client = { path = "thin-client", version = "=1.19.0" } solana-tpu-client = { path = "tpu-client", version = "=1.19.0", default-features = false } +solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.19.0" } solana-transaction-status = { path = "transaction-status", version = "=1.19.0" } solana-turbine = { path = "turbine", version = "=1.19.0" } solana-udp-client = { path = "udp-client", version = "=1.19.0" } diff --git a/core/Cargo.toml b/core/Cargo.toml index e2a936cdabc4c1..1fd25ec38a8d3b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -67,6 +67,7 @@ solana-send-transaction-service = { workspace = true } solana-streamer = { workspace = true } solana-svm = { workspace = true } solana-tpu-client = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f4ac6c6040eda8..55f57c0a684b5a 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -208,6 +208,33 @@ impl Consumer { .slot_metrics_tracker .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + // Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes + // We assume the retryable_transaction_indexes is already sorted. + let mut retryable_idx = 0; + for (index, packet) in packets_to_process.iter().enumerate() { + if packet.original_packet().meta().is_perf_track_packet() { + if let Some(start_time) = packet.start_time() { + if retryable_idx >= retryable_transaction_indexes.len() + || retryable_transaction_indexes[retryable_idx] != index + { + let duration = Instant::now().duration_since(*start_time); + + debug!( + "Banking stage processing took {duration:?} for transaction {:?}", + packet.transaction().get_signatures().first() + ); + inc_new_counter_info!( + "txn-metrics-banking-stage-process-us", + duration.as_micros() as usize + ); + } else { + // This packet is retried, advance the retry index to the next, as the next packet's index will + // certainly be > than this. + retryable_idx += 1; + } + } + } + } Some(retryable_transaction_indexes) } diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index 26ede7045d3480..6eb5d68ecaaca5 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -13,7 +13,7 @@ use { VersionedTransaction, }, }, - std::{cmp::Ordering, mem::size_of, sync::Arc}, + std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant}, thiserror::Error, }; @@ -41,10 +41,16 @@ pub struct ImmutableDeserializedPacket { message_hash: Hash, is_simple_vote: bool, compute_budget_details: ComputeBudgetDetails, + banking_stage_start_time: Option, } impl ImmutableDeserializedPacket { pub fn new(packet: Packet) -> Result { + let banking_stage_start_time = packet + .meta() + .is_perf_track_packet() + .then_some(Instant::now()); + let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?; let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; @@ -67,6 +73,7 @@ impl ImmutableDeserializedPacket { message_hash, is_simple_vote, compute_budget_details, + banking_stage_start_time, }) } @@ -98,6 +105,10 @@ impl ImmutableDeserializedPacket { self.compute_budget_details.clone() } + pub fn start_time(&self) -> &Option { + &self.banking_stage_start_time + } + // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index fcc68050b72d4c..52706f8c2bf63b 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -924,6 +924,7 @@ impl ThreadLocalUnprocessedPackets { .iter() .map(|p| (*p).clone()) .collect_vec(); + let retryable_packets = if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process, payload) { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index e5e06a3bc701c9..eb6f92f6bfaf7e 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -18,9 +18,11 @@ use { count_discarded_packets, count_packets_in_batches, count_valid_packets, shrink_batches, }, }, - solana_sdk::timing, + solana_sdk::{signature::Signature, timing}, solana_streamer::streamer::{self, StreamerError}, + solana_transaction_metrics_tracker::get_signature_from_packet, std::{ + collections::HashMap, thread::{self, Builder, JoinHandle}, time::Instant, }, @@ -296,8 +298,20 @@ impl SigVerifyStage { verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { - let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + // track sigverify start time for interested packets + for batch in &batches { + for packet in batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + packet_perf_measure.insert(*signature, Instant::now()); + } + } + } + } let batches_len = batches.len(); debug!( "@{:?} verifier: verifying: {}", @@ -370,6 +384,17 @@ impl SigVerifyStage { (num_packets as f32 / verify_time.as_s()) ); + for (signature, start_time) in packet_perf_measure.drain() { + let duration = Instant::now().duration_since(start_time); + debug!( + "Sigverify took {duration:?} for transaction {:?}", + Signature::from(signature) + ); + inc_new_counter_info!( + "txn-metrics-sigverify-packet-verify-us", + duration.as_micros() as usize + ); + } stats .recv_batches_us_hist .increment(recv_duration.as_micros() as u64) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 1d11ce6e65542e..60e1c119f1cea2 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4908,6 +4908,7 @@ dependencies = [ "solana-streamer", "solana-svm", "solana-tpu-client", + "solana-transaction-metrics-tracker", "solana-transaction-status", "solana-turbine", "solana-unified-scheduler-pool", @@ -6267,6 +6268,7 @@ dependencies = [ "solana-metrics", "solana-perf", "solana-sdk", + "solana-transaction-metrics-tracker", "thiserror", "tokio", "x509-parser", @@ -6369,6 +6371,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-transaction-metrics-tracker" +version = "1.19.0" +dependencies = [ + "Inflector", + "base64 0.21.7", + "bincode", + "lazy_static", + "log", + "rand 0.8.5", + "solana-perf", + "solana-sdk", +] + [[package]] name = "solana-transaction-status" version = "1.19.0" diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index faea9ab4753c67..8300b57218c696 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,6 +33,8 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance + const PERF_TRACK_PACKET = 0b0100_0000; } } @@ -228,6 +230,12 @@ impl Meta { self.flags.set(PacketFlags::TRACER_PACKET, is_tracer); } + #[inline] + pub fn set_track_performance(&mut self, is_performance_track: bool) { + self.flags + .set(PacketFlags::PERF_TRACK_PACKET, is_performance_track); + } + #[inline] pub fn set_simple_vote(&mut self, is_simple_vote: bool) { self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote); @@ -261,6 +269,11 @@ impl Meta { self.flags.contains(PacketFlags::TRACER_PACKET) } + #[inline] + pub fn is_perf_track_packet(&self) -> bool { + self.flags.contains(PacketFlags::PERF_TRACK_PACKET) + } + #[inline] pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) diff --git a/sdk/src/transaction/versioned/sanitized.rs b/sdk/src/transaction/versioned/sanitized.rs index 61ecdfea56bb2a..b6311d5886b0e3 100644 --- a/sdk/src/transaction/versioned/sanitized.rs +++ b/sdk/src/transaction/versioned/sanitized.rs @@ -33,6 +33,10 @@ impl SanitizedVersionedTransaction { &self.message } + pub fn get_signatures(&self) -> &Vec { + &self.signatures + } + /// Consumes the SanitizedVersionedTransaction, returning the fields individually. pub fn destruct(self) -> (Vec, SanitizedVersionedMessage) { (self.signatures, self.message) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 8e1eb12dff1d42..22170e6426c433 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -29,6 +29,7 @@ rustls = { workspace = true, features = ["dangerous_configuration"] } solana-metrics = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } +solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } x509-parser = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 225412dd08b315..23234231fd2750 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -27,10 +27,14 @@ use { QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, }, - signature::Keypair, + signature::{Keypair, Signature}, timing, }, + solana_transaction_metrics_tracker::{ + get_signature_from_packet, signature_if_should_track_packet, + }, std::{ + collections::HashMap, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ @@ -81,6 +85,7 @@ struct PacketChunk { struct PacketAccumulator { pub meta: Meta, pub chunks: Vec, + pub start_time: Instant, } #[derive(Copy, Clone, Debug)] @@ -628,6 +633,7 @@ async fn packet_batch_sender( trace!("enter packet_batch_sender"); let mut batch_start_time = Instant::now(); loop { + let mut packet_perf_measure: HashMap<[u8; 64], std::time::Instant> = HashMap::default(); let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); let mut total_bytes: usize = 0; @@ -647,6 +653,7 @@ async fn packet_batch_sender( || (!packet_batch.is_empty() && elapsed >= coalesce) { let len = packet_batch.len(); + track_streamer_fetch_packet_performance(&packet_batch, &mut packet_perf_measure); if let Err(e) = packet_sender.send(packet_batch) { stats .total_packet_batch_send_err @@ -692,6 +699,13 @@ async fn packet_batch_sender( total_bytes += packet_batch[i].meta().size; + if let Some(signature) = + signature_if_should_track_packet(&packet_batch[i]).unwrap_or(None) + { + packet_perf_measure.insert(*signature, packet_accumulator.start_time); + // we set the PERF_TRACK_PACKET on + packet_batch[i].meta_mut().set_track_performance(true); + } stats .total_chunks_processed_by_batcher .fetch_add(num_chunks, Ordering::Relaxed); @@ -700,6 +714,30 @@ async fn packet_batch_sender( } } +fn track_streamer_fetch_packet_performance( + packet_batch: &PacketBatch, + packet_perf_measure: &mut HashMap<[u8; 64], Instant>, +) { + for packet in packet_batch.iter() { + if packet.meta().is_perf_track_packet() { + let signature = get_signature_from_packet(packet); + if let Ok(signature) = signature { + if let Some(start_time) = packet_perf_measure.remove(signature) { + let duration = Instant::now().duration_since(start_time); + debug!( + "QUIC streamer fetch stage took {duration:?} for transaction {:?}", + Signature::from(*signature) + ); + inc_new_counter_info!( + "txn-metrics-quic-streamer-packet-fetch-us", + duration.as_micros() as usize + ); + } + } + } + } +} + async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -854,6 +892,7 @@ async fn handle_chunk( *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), + start_time: Instant::now(), }); } @@ -1453,6 +1492,7 @@ pub mod test { offset, end_of_chunk: size, }], + start_time: Instant::now(), }; ptk_sender.send(packet_accum).await.unwrap(); } diff --git a/transaction-metrics-tracker/Cargo.toml b/transaction-metrics-tracker/Cargo.toml new file mode 100644 index 00000000000000..9bd82702a3ebb4 --- /dev/null +++ b/transaction-metrics-tracker/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "solana-transaction-metrics-tracker" +description = "Solana transaction metrics tracker" +documentation = "https://docs.rs/solana-transaction-metrics-tracker" +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = false + +[dependencies] +Inflector = { workspace = true } +base64 = { workspace = true } +bincode = { workspace = true } +# Update this borsh dependency to the workspace version once +lazy_static = { workspace = true } +log = { workspace = true } +rand = { workspace = true } +solana-perf = { workspace = true } +solana-sdk = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/transaction-metrics-tracker/src/lib.rs b/transaction-metrics-tracker/src/lib.rs new file mode 100644 index 00000000000000..02ae2c14fa8ca5 --- /dev/null +++ b/transaction-metrics-tracker/src/lib.rs @@ -0,0 +1,54 @@ +use { + lazy_static::lazy_static, + log::*, + rand::Rng, + solana_perf::sigverify::PacketError, + solana_sdk::{packet::Packet, short_vec::decode_shortu16_len, signature::SIGNATURE_BYTES}, +}; + +// The mask is 12 bits long (1<<12 = 4096), it means the probability of matching +// the transaction is 1/4096 assuming the portion being matched is random. +lazy_static! { + static ref TXN_MASK: u16 = rand::thread_rng().gen_range(0..4096); +} + +/// Check if a transaction given its signature matches the randomly selected mask. +/// The signaure should be from the reference of Signature +pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool { + // We do not use the highest signature byte as it is not really random + let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4; + trace!("Matching txn: {match_portion:b} {:b}", *TXN_MASK); + *TXN_MASK == match_portion +} + +/// Check if a transaction packet's signature matches the mask. +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn signature_if_should_track_packet( + packet: &Packet, +) -> Result, PacketError> { + let signature = get_signature_from_packet(packet)?; + Ok(should_track_transaction(signature).then_some(signature)) +} + +/// Get the signature of the transaction packet +/// This does a rudimentry verification to make sure the packet at least +/// contains the signature data and it returns the reference to the signature. +pub fn get_signature_from_packet(packet: &Packet) -> Result<&[u8; SIGNATURE_BYTES], PacketError> { + let (sig_len_untrusted, sig_start) = packet + .data(..) + .and_then(|bytes| decode_shortu16_len(bytes).ok()) + .ok_or(PacketError::InvalidShortVec)?; + + if sig_len_untrusted < 1 { + return Err(PacketError::InvalidSignatureLen); + } + + let signature = packet + .data(sig_start..sig_start.saturating_add(SIGNATURE_BYTES)) + .ok_or(PacketError::InvalidSignatureLen)?; + let signature = signature + .try_into() + .map_err(|_| PacketError::InvalidSignatureLen)?; + Ok(signature) +}