Skip to content

Commit

Permalink
Merge pull request jito-foundation#10 from solana-forge/forge/message…
Browse files Browse the repository at this point in the history
…_hash_filter

cache simulation result during leader slots
  • Loading branch information
semgoSE authored Mar 13, 2024
2 parents c002fac + 20902f6 commit 38bf619
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
32 changes: 28 additions & 4 deletions core/src/pbs/delayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
extract_first_signature,
filters::SubscriptionFilters,
grpc::{PbsBatch, SanitizedTransactionWithSimulationResult, SimulationResult},
simulation_result_cache::SimulationResultCache,
slot_boundary::SlotBoundaryStatus,
},
},
Expand Down Expand Up @@ -58,6 +59,7 @@ struct PbsDelayerStageStats {
num_empty_batches: u64,
num_simulated_chunks: u64,
num_simulated_txs: u64,
num_skip_simulation_txs: u64,
simulation_us: u64,

num_failed_sims: u64,
Expand Down Expand Up @@ -134,6 +136,10 @@ impl PbsDelayerStageStats {
"pbs_delayer-stats",
("num_unprocessed_txs", self.num_unprocessed_txs, i64),
);
datapoint_info!(
"pbs_delayer-stats",
("num_skip_simulation_txs", self.num_skip_simulation_txs, i64),
);
}
}

Expand Down Expand Up @@ -165,6 +171,7 @@ async fn run_delayer(actor: PacketDelayer) {
let mut unprocessed_batches = VecDeque::new();
let mut delay_queue = DelayQueue::new();
let mut drop_signatures: HashSet<Signature> = HashSet::new();
let mut simulation_result_cache = SimulationResultCache::new();
let mut metrics_tick = interval(Duration::from_secs(1));

while !exit.load(Ordering::Relaxed) {
Expand All @@ -177,8 +184,9 @@ async fn run_delayer(actor: PacketDelayer) {
break;
}
if let SlotBoundaryStatus::StandBy = *slot_boundary_watch.borrow_and_update() {
// Clear drop signatures at the end of leader slot
// Clear drop signatures and simulation_result_cache at the end of leader slot
drop_signatures.clear();
simulation_result_cache.clear();
}
}

Expand Down Expand Up @@ -239,8 +247,9 @@ async fn run_delayer(actor: PacketDelayer) {
Some(batch) = futures::future::ready(unprocessed_batches.front_mut()) => {
let pbs_batch = {
let bank = bank_forks.read().unwrap().working_bank();
batch.simulate_chunk(bank.as_ref(), SIMULATION_CHUNK_SIZE, &mut stats)
batch.simulate_chunk(bank.as_ref(), SIMULATION_CHUNK_SIZE, &simulation_result_cache, &mut stats)
};
simulation_result_cache.populate_with_failed_simulations(&pbs_batch);
if pbs.send(pbs_batch).is_err() {
error!("pbs receiver closed");
break;
Expand Down Expand Up @@ -352,22 +361,37 @@ impl PacketBatchInProcess {
&mut self,
bank: &Bank,
chunk_size: usize,
simulation_result_cache: &SimulationResultCache,
stats: &mut PbsDelayerStageStats,
) -> Vec<SanitizedTransactionWithSimulationResult> {
let start = self.unprocessed.len().saturating_sub(chunk_size);
let batch_size = self.unprocessed.len() - start;

let batch: Vec<_> = self
.unprocessed
.drain(start..)
.map(|tx| {
.filter_map(|tx| {
if simulation_result_cache.contains(&tx) {
// Skip simulation and processing of already simulated tx but with different
// blockhash
return None;
}
let (simulation_result, simulation_us) = measure_us!(simulate(&tx, bank));
saturating_add_assign!(stats.simulation_us, simulation_us);
collect_simulation_stats(&simulation_result, stats);
SanitizedTransactionWithSimulationResult::new(tx, simulation_result)
Some(SanitizedTransactionWithSimulationResult::new(
tx,
simulation_result,
))
})
.collect();

saturating_add_assign!(stats.num_simulated_chunks, 1);
saturating_add_assign!(stats.num_simulated_txs, batch.len() as u64);
saturating_add_assign!(
stats.num_skip_simulation_txs,
(batch_size - batch.len()) as u64
);

batch
}
Expand Down
1 change: 1 addition & 0 deletions core/src/pbs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod forwarder;
mod grpc;
mod interceptor;
pub mod pbs_stage;
mod simulation_result_cache;
mod slot_boundary;

#[derive(Error, Debug)]
Expand Down
45 changes: 45 additions & 0 deletions core/src/pbs/simulation_result_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use {
crate::pbs::grpc::SanitizedTransactionWithSimulationResult,
solana_bundle::bundle_execution::LoadAndExecuteBundleError,
solana_sdk::{hash::Hash, transaction::SanitizedTransaction},
std::collections::HashSet,
};

pub struct SimulationResultCache {
cache: HashSet<Hash>,
}

impl SimulationResultCache {
pub fn new() -> Self {
Self {
cache: HashSet::new(),
}
}

pub fn clear(&mut self) {
self.cache.clear();
}

pub fn contains(&self, sanitized_transaction: &SanitizedTransaction) -> bool {
let mut message = sanitized_transaction.to_versioned_transaction().message;
message.set_recent_blockhash(Default::default());
self.cache.contains(&message.hash())
}

pub fn populate_with_failed_simulations(
&mut self,
simulation_results: &[SanitizedTransactionWithSimulationResult],
) {
self.cache.extend(simulation_results.iter().filter_map(|v| {
if let Some(Err(LoadAndExecuteBundleError::TransactionError { .. })) =
v.simulation_result
{
let mut message = v.transaction.to_versioned_transaction().message;
message.set_recent_blockhash(Default::default());
Some(message.hash())
} else {
None
}
}))
}
}

0 comments on commit 38bf619

Please sign in to comment.