Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EthApi): Add broadcast stream for incoming raw transactions #13165

Merged
merged 12 commits into from
Dec 11, 2024
36 changes: 9 additions & 27 deletions crates/rpc/rpc-eth-api/src/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use reth_provider::{
BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
TransactionsProvider,
};
use reth_rpc_eth_types::{
utils::{binary_search, recover_raw_transaction},
EthApiError, SignError, TransactionSource,
};
use reth_rpc_eth_types::{utils::binary_search, EthApiError, SignError, TransactionSource};
use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use std::sync::Arc;
Expand Down Expand Up @@ -63,6 +60,14 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
#[expect(clippy::type_complexity)]
fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>>;

/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
fn send_raw_transaction(
&self,
tx: Bytes,
) -> impl Future<Output = Result<B256, Self::Error>> + Send;

/// Returns the transaction by hash.
///
/// Checks the pool and state.
Expand Down Expand Up @@ -335,29 +340,6 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
}
}

/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
fn send_raw_transaction(
&self,
tx: Bytes,
) -> impl Future<Output = Result<B256, Self::Error>> + Send {
async move {
let recovered = recover_raw_transaction(&tx)?;
let pool_transaction =
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);

// submit the transaction to the pool with a `Local` origin
let hash = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await
.map_err(Self::Error::from_eth_err)?;

Ok(hash)
}
}

/// Signs transaction with a matching signer, if any and submits the transaction to the pool.
/// Returns the hash of the signed transaction.
fn send_transaction(
Expand Down
25 changes: 23 additions & 2 deletions crates/rpc/rpc/src/eth/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_network::Ethereum;
use alloy_primitives::U256;
use alloy_primitives::{Bytes, U256};
use derive_more::Deref;
use reth_primitives::NodePrimitives;
use reth_provider::{
Expand All @@ -26,10 +26,13 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor,
};
use tokio::sync::Mutex;
use tokio::sync::{broadcast, Mutex};
use tracing::trace;

use crate::eth::EthTxBuilder;

const DEFAULT_BROADCAST_CAPACITY: usize = 2000;

/// `Eth` API implementation.
///
/// This type provides the functionality for handling `eth_` related requests.
Expand Down Expand Up @@ -270,6 +273,9 @@ pub struct EthApiInner<Provider: BlockReader, Pool, Network, EvmConfig> {

/// Guard for getproof calls
blocking_task_guard: BlockingTaskGuard,

/// Transaction broadcast channel
raw_tx_sender: broadcast::Sender<Bytes>,
}

impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
Expand Down Expand Up @@ -304,6 +310,8 @@ where
.unwrap_or_default(),
);

let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY);

Self {
provider,
pool,
Expand All @@ -321,6 +329,7 @@ where
fee_history_cache,
evm_config,
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
raw_tx_sender,
}
}
}
Expand Down Expand Up @@ -428,6 +437,18 @@ where
pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard {
&self.blocking_task_guard
}

/// Returns [`broadcast::Receiver`] of new raw transactions
#[inline]
pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver<Bytes> {
self.raw_tx_sender.subscribe()
}

/// Broadcasts raw transaction if there are active subscribers.
#[inline]
pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) {
let _ = self.raw_tx_sender.send(raw_tx);
}
}

#[cfg(test)]
Expand Down
30 changes: 26 additions & 4 deletions crates/rpc/rpc/src/eth/helpers/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! Contains RPC handler implementations specific to transactions

use crate::EthApi;
use alloy_primitives::{Bytes, B256};
use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider};
use reth_rpc_eth_api::{
helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking},
FullEthApiTypes, RpcNodeCoreExt,
FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt,
};
use reth_transaction_pool::TransactionPool;

use crate::EthApi;
use reth_rpc_eth_types::utils::recover_raw_transaction;
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};

impl<Provider, Pool, Network, EvmConfig> EthTransactions
for EthApi<Provider, Pool, Network, EvmConfig>
Expand All @@ -19,6 +20,27 @@ where
fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>> {
self.inner.signers()
}

/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
let recovered = recover_raw_transaction(&tx)?;

// broadcast raw transaction to subscribers if there is any.
self.broadcast_raw_transaction(tx);

let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);

// submit the transaction to the pool with a `Local` origin
let hash = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await
.map_err(Self::Error::from_eth_err)?;

Ok(hash)
}
}

impl<Provider, Pool, Network, EvmConfig> LoadTransaction
Expand Down
Loading