Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: some blob reinject improvements #5441

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ impl BlobStore for DiskFileBlobStore {
self.inner.get_one(tx)
}

fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
self.inner.contains(tx)
}

fn get_all(
&self,
txs: Vec<B256>,
Expand Down Expand Up @@ -183,6 +187,15 @@ impl DiskFileBlobStoreInner {
Ok(())
}

/// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
if self.blob_cache.lock().get(&tx).is_some() {
return Ok(true)
}
// we only check if the file exists and assume it's valid
Ok(self.blob_disk_file(tx).is_file())
}

/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
Expand Down Expand Up @@ -438,6 +451,7 @@ mod tests {
assert!(blobs.contains(&(tx, blob)), "missing blob {:?}", tx);
}

assert!(store.contains(all_hashes[0]).unwrap());
store.delete_all(all_hashes.clone()).unwrap();
store.clear_cache();

Expand All @@ -446,6 +460,7 @@ mod tests {
let all = store.get_all(all_hashes.clone()).unwrap();
assert!(all.is_empty());

assert!(!store.contains(all_hashes[0]).unwrap());
assert!(store.get_exact(all_hashes).is_err());
}
}
5 changes: 5 additions & 0 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl BlobStore for InMemoryBlobStore {
Ok(store.get(&tx).cloned())
}

fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
let store = self.inner.store.read();
Ok(store.contains_key(&tx))
}

fn get_all(
&self,
txs: Vec<B256>,
Expand Down
3 changes: 3 additions & 0 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;

/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;

/// Retrieves all decoded blob data for the given transaction hashes.
///
/// This only returns the blobs that were found in the store.
Expand Down
4 changes: 4 additions & 0 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl BlobStore for NoopBlobStore {
Ok(None)
}

fn contains(&self, _tx: B256) -> Result<bool, BlobStoreError> {
Ok(false)
}

fn get_all(
&self,
_txs: Vec<B256>,
Expand Down
11 changes: 9 additions & 2 deletions crates/transaction-pool/src/blobstore/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ impl BlobStoreCanonTracker {
}

/// Adds all blocks to the tracked list of blocks.
///
/// Replaces any previously tracked blocks with the set of transactions.
pub fn add_blocks(
&mut self,
blocks: impl IntoIterator<Item = (BlockNumber, impl IntoIterator<Item = B256>)>,
Expand All @@ -32,6 +34,9 @@ impl BlobStoreCanonTracker {
}

/// Adds all blob transactions from the given chain to the tracker.
///
/// Note: In case this is a chain that's part of a reorg, this replaces previously tracked
/// blocks.
pub fn add_new_chain_blocks(&mut self, blocks: &ChainBlocks<'_>) {
let blob_txs = blocks.iter().map(|(num, blocks)| {
let iter =
Expand All @@ -42,10 +47,12 @@ impl BlobStoreCanonTracker {
}

/// Invoked when a block is finalized.
pub fn on_finalized_block(&mut self, number: BlockNumber) -> BlobStoreUpdates {
///
/// This returns all blob transactions that were included in blocks that are now finalized.
pub fn on_finalized_block(&mut self, finalized_block: BlockNumber) -> BlobStoreUpdates {
let mut finalized = Vec::new();
while let Some(entry) = self.blob_txs_in_blocks.first_entry() {
if *entry.key() <= number {
if *entry.key() <= finalized_block {
finalized.extend(entry.remove_entry().1);
} else {
break
Expand Down
3 changes: 3 additions & 0 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ where
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
if transactions.is_empty() {
return Ok(Vec::new())
}
let validated = self.validate_all(origin, transactions).await?;

let transactions =
Expand Down
5 changes: 3 additions & 2 deletions crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,12 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
// to be re-injected
//
// Note: we no longer know if the tx was local or external
// Because the transactions are not finalized, the corresponding blobs are still in
// blob store (if we previously received them from the network)
metrics.inc_reinserted_transactions(pruned_old_transactions.len());
let _ = pool.add_external_transactions(pruned_old_transactions).await;

// keep track of mined blob transactions
// TODO(mattsse): handle reorged transactions
// keep track of new mined blob transactions
blob_store_tracker.add_new_chain_blocks(&new_blocks);
}
CanonStateNotification::Commit { new } => {
Expand Down
6 changes: 5 additions & 1 deletion crates/transaction-pool/src/validate/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,11 @@ where
)
}
EthBlobTransactionSidecar::Missing => {
if let Ok(Some(_)) = self.blob_store.get(*transaction.hash()) {
// This can happen for re-injected blob transactions (on re-org), since the blob
// is stripped from the transaction and not included in a block.
// check if the blob is in the store, if it's included we previously validated
// it and inserted it
if let Ok(true) = self.blob_store.contains(*transaction.hash()) {
// validated transaction is already in the store
} else {
return TransactionValidationOutcome::Invalid(
Expand Down