diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 364ea27cc31d..f34329b5031c 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -17,10 +17,7 @@ use reth_provider::{ BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider, TransactionsProvider, }; -use reth_rpc_eth_types::{ - utils::{binary_search, recover_raw_transaction}, - EthApiError, SignError, TransactionSource, -}; +use reth_rpc_eth_types::{utils::binary_search, EthApiError, SignError, TransactionSource}; use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context}; use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; use std::sync::Arc; @@ -63,6 +60,14 @@ pub trait EthTransactions: LoadTransaction { #[expect(clippy::type_complexity)] fn signers(&self) -> &parking_lot::RwLock>>>>; + /// Decodes and recovers the transaction and submits it to the pool. + /// + /// Returns the hash of the transaction. + fn send_raw_transaction( + &self, + tx: Bytes, + ) -> impl Future> + Send; + /// Returns the transaction by hash. /// /// Checks the pool and state. @@ -335,29 +340,6 @@ pub trait EthTransactions: LoadTransaction { } } - /// Decodes and recovers the transaction and submits it to the pool. - /// - /// Returns the hash of the transaction. - fn send_raw_transaction( - &self, - tx: Bytes, - ) -> impl Future> + Send { - async move { - let recovered = recover_raw_transaction(&tx)?; - let pool_transaction = - ::Transaction::from_pooled(recovered); - - // submit the transaction to the pool with a `Local` origin - let hash = self - .pool() - .add_transaction(TransactionOrigin::Local, pool_transaction) - .await - .map_err(Self::Error::from_eth_err)?; - - Ok(hash) - } - } - /// Signs transaction with a matching signer, if any and submits the transaction to the pool. /// Returns the hash of the signed transaction. fn send_transaction( diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 1fe08d1c57f8..0c8f06ba2b83 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use alloy_consensus::BlockHeader; use alloy_eips::BlockNumberOrTag; use alloy_network::Ethereum; -use alloy_primitives::U256; +use alloy_primitives::{Bytes, U256}; use derive_more::Deref; use reth_primitives::NodePrimitives; use reth_provider::{ @@ -26,10 +26,12 @@ use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, TaskSpawner, TokioTaskExecutor, }; -use tokio::sync::Mutex; +use tokio::sync::{broadcast, Mutex}; use crate::eth::EthTxBuilder; +const DEFAULT_BROADCAST_CAPACITY: usize = 2000; + /// `Eth` API implementation. /// /// This type provides the functionality for handling `eth_` related requests. @@ -270,6 +272,9 @@ pub struct EthApiInner { /// Guard for getproof calls blocking_task_guard: BlockingTaskGuard, + + /// Transaction broadcast channel + raw_tx_sender: broadcast::Sender, } impl EthApiInner @@ -304,6 +309,8 @@ where .unwrap_or_default(), ); + let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY); + Self { provider, pool, @@ -321,6 +328,7 @@ where fee_history_cache, evm_config, blocking_task_guard: BlockingTaskGuard::new(proof_permits), + raw_tx_sender, } } } @@ -428,6 +436,18 @@ where pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard { &self.blocking_task_guard } + + /// Returns [`broadcast::Receiver`] of new raw transactions + #[inline] + pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver { + self.raw_tx_sender.subscribe() + } + + /// Broadcasts raw transaction if there are active subscribers. + #[inline] + pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { + let _ = self.raw_tx_sender.send(raw_tx); + } } #[cfg(test)] diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 04ed812fab2f..3fcdfb3f0681 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -1,13 +1,14 @@ //! Contains RPC handler implementations specific to transactions +use crate::EthApi; +use alloy_primitives::{Bytes, B256}; use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider}; use reth_rpc_eth_api::{ helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, - FullEthApiTypes, RpcNodeCoreExt, + FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt, }; -use reth_transaction_pool::TransactionPool; - -use crate::EthApi; +use reth_rpc_eth_types::utils::recover_raw_transaction; +use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; impl EthTransactions for EthApi @@ -19,6 +20,27 @@ where fn signers(&self) -> &parking_lot::RwLock>>>> { self.inner.signers() } + + /// Decodes and recovers the transaction and submits it to the pool. + /// + /// Returns the hash of the transaction. + async fn send_raw_transaction(&self, tx: Bytes) -> Result { + let recovered = recover_raw_transaction(&tx)?; + + // broadcast raw transaction to subscribers if there is any. + self.broadcast_raw_transaction(tx); + + let pool_transaction = ::Transaction::from_pooled(recovered); + + // submit the transaction to the pool with a `Local` origin + let hash = self + .pool() + .add_transaction(TransactionOrigin::Local, pool_transaction) + .await + .map_err(Self::Error::from_eth_err)?; + + Ok(hash) + } } impl LoadTransaction