From 9ced5724cd238fe6e695007a60f6521511a34ba4 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Thu, 5 Dec 2024 18:14:59 +0100 Subject: [PATCH 01/10] feat: Add broadcast stream for incoming raw transactions --- Cargo.lock | 1 + .../rpc-eth-api/src/helpers/transaction.rs | 9 +++++++- crates/rpc/rpc/Cargo.toml | 1 + crates/rpc/rpc/src/eth/core.rs | 21 ++++++++++++++++++- crates/rpc/rpc/src/eth/helpers/transaction.rs | 9 ++++++-- 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 666293e58e6b..59e7352248b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8896,6 +8896,7 @@ dependencies = [ "reth-rpc-types-compat", "reth-tasks", "reth-testing-utils", + "reth-tokio-util", "reth-transaction-pool", "revm", "revm-inspectors", diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index f73d761600e8..499499744600 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -62,6 +62,10 @@ pub trait EthTransactions: LoadTransaction { /// Singer access in default (L1) trait method implementations. fn signers(&self) -> &parking_lot::RwLock>>; + /// Broadcasts raw transaction to subscribed observers. Used internally by + /// `send_raw_transaction` to emit transactions as they are processed. + fn broadcast_raw_transaction(&self, raw_tx: Bytes); + /// Returns the transaction by hash. /// /// Checks the pool and state. @@ -342,7 +346,7 @@ pub trait EthTransactions: LoadTransaction { tx: Bytes, ) -> impl Future> + Send { async move { - let recovered = recover_raw_transaction(tx)?; + let recovered = recover_raw_transaction(tx.clone())?; let pool_transaction = ::Transaction::from_pooled(recovered.into()); @@ -353,6 +357,9 @@ pub trait EthTransactions: LoadTransaction { .await .map_err(Self::Error::from_eth_err)?; + // broadcast raw transaction to subscribers if there is any. + self.broadcast_raw_transaction(tx); + Ok(hash) } } diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 5efae46f0061..5f42c9354b61 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -36,6 +36,7 @@ reth-rpc-server-types.workspace = true reth-network-types.workspace = true reth-consensus.workspace = true reth-payload-validator.workspace = true +reth-tokio-util.workspace = true # ethereum alloy-consensus.workspace = true diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 0a17e5e5f2b9..d1d8e512ad99 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use alloy_consensus::BlockHeader; use alloy_eips::BlockNumberOrTag; use alloy_network::Ethereum; -use alloy_primitives::U256; +use alloy_primitives::{Bytes, U256}; use derive_more::Deref; use reth_primitives::NodePrimitives; use reth_provider::{ @@ -26,6 +26,7 @@ use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, TaskSpawner, TokioTaskExecutor, }; +use reth_tokio_util::{EventSender, EventStream}; use tokio::sync::Mutex; use crate::eth::EthTxBuilder; @@ -270,6 +271,9 @@ pub struct EthApiInner { /// Guard for getproof calls blocking_task_guard: BlockingTaskGuard, + + /// Transaction broadcast channel + raw_tx_sender: EventSender, } impl EthApiInner @@ -304,6 +308,8 @@ where .unwrap_or_default(), ); + let raw_tx_sender = EventSender::default(); + Self { provider, pool, @@ -321,6 +327,7 @@ where fee_history_cache, evm_config, blocking_task_guard: BlockingTaskGuard::new(proof_permits), + raw_tx_sender, } } } @@ -424,6 +431,18 @@ where pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard { &self.blocking_task_guard } + + /// Returns [`EventStream`] of new raw transactions. + #[inline] + pub fn subscribe_to_raw_transactions(&self) -> EventStream { + self.raw_tx_sender.new_listener() + } + + /// Broadcasts raw transaction to all subscribers. + #[inline] + pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { + self.raw_tx_sender.notify(raw_tx) + } } #[cfg(test)] diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 647e16c25af6..6d8cae963a58 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -1,5 +1,7 @@ //! Contains RPC handler implementations specific to transactions +use crate::EthApi; +use alloy_primitives::Bytes; use reth_provider::{BlockReader, BlockReaderIdExt, TransactionsProvider}; use reth_rpc_eth_api::{ helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, @@ -7,8 +9,6 @@ use reth_rpc_eth_api::{ }; use reth_transaction_pool::TransactionPool; -use crate::EthApi; - impl EthTransactions for EthApi where @@ -19,6 +19,11 @@ where fn signers(&self) -> &parking_lot::RwLock>> { self.inner.signers() } + + #[inline] + fn broadcast_raw_transaction(&self, raw_tx: Bytes) { + self.inner.broadcast_raw_transaction(raw_tx) + } } impl LoadTransaction From a64bf5db8f6e5d84f29b8882dacbeb679c182153 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Thu, 5 Dec 2024 18:31:43 +0100 Subject: [PATCH 02/10] fix: update OpEthApi --- crates/optimism/rpc/src/eth/transaction.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/optimism/rpc/src/eth/transaction.rs b/crates/optimism/rpc/src/eth/transaction.rs index b5d4ce2bc555..d361fabf4db3 100644 --- a/crates/optimism/rpc/src/eth/transaction.rs +++ b/crates/optimism/rpc/src/eth/transaction.rs @@ -52,6 +52,11 @@ where Ok(hash) } + + #[inline] + fn broadcast_raw_transaction(&self, raw_tx: Bytes) { + self.inner.eth_api.broadcast_raw_transaction(raw_tx) + } } impl LoadTransaction for OpEthApi From 20e8d27be99ceacf4fba3452e0999f47ef718aef Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Thu, 5 Dec 2024 20:34:51 +0100 Subject: [PATCH 03/10] fix: suggestions --- Cargo.lock | 1 - crates/rpc/rpc/Cargo.toml | 1 - crates/rpc/rpc/src/eth/core.rs | 21 ++++++++++++--------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59e7352248b5..666293e58e6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8896,7 +8896,6 @@ dependencies = [ "reth-rpc-types-compat", "reth-tasks", "reth-testing-utils", - "reth-tokio-util", "reth-transaction-pool", "revm", "revm-inspectors", diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 5f42c9354b61..5efae46f0061 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -36,7 +36,6 @@ reth-rpc-server-types.workspace = true reth-network-types.workspace = true reth-consensus.workspace = true reth-payload-validator.workspace = true -reth-tokio-util.workspace = true # ethereum alloy-consensus.workspace = true diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index d1d8e512ad99..325029fe807c 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -26,11 +26,12 @@ use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, TaskSpawner, TokioTaskExecutor, }; -use reth_tokio_util::{EventSender, EventStream}; -use tokio::sync::Mutex; +use tokio::sync::{broadcast, Mutex}; use crate::eth::EthTxBuilder; +const DEFAULT_BROADCAST_CAPACITY: usize = 2000; + /// `Eth` API implementation. /// /// This type provides the functionality for handling `eth_` related requests. @@ -273,7 +274,7 @@ pub struct EthApiInner { blocking_task_guard: BlockingTaskGuard, /// Transaction broadcast channel - raw_tx_sender: EventSender, + raw_tx_sender: broadcast::Sender, } impl EthApiInner @@ -308,7 +309,7 @@ where .unwrap_or_default(), ); - let raw_tx_sender = EventSender::default(); + let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY); Self { provider, @@ -432,16 +433,18 @@ where &self.blocking_task_guard } - /// Returns [`EventStream`] of new raw transactions. + /// Returns [`broadcast::Receiver`] of new raw transactions. #[inline] - pub fn subscribe_to_raw_transactions(&self) -> EventStream { - self.raw_tx_sender.new_listener() + pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver { + self.raw_tx_sender.subscribe() } - /// Broadcasts raw transaction to all subscribers. + /// Broadcasts raw transaction if there are active subscribers. #[inline] pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { - self.raw_tx_sender.notify(raw_tx) + if self.raw_tx_sender.receiver_count() > 0 { + let _ = self.raw_tx_sender.send(raw_tx); + } } } From 110b1ce91144cfdccab006daa4cb108100b9386d Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Fri, 6 Dec 2024 13:38:41 +0100 Subject: [PATCH 04/10] trigger workflow --- crates/rpc/rpc/src/eth/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 325029fe807c..6039a2282f13 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -433,7 +433,7 @@ where &self.blocking_task_guard } - /// Returns [`broadcast::Receiver`] of new raw transactions. + /// Returns [`broadcast::Receiver`] of new raw transactions #[inline] pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver { self.raw_tx_sender.subscribe() From ac683863eb95299ee73e2e5238ae7e93f350c48e Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Fri, 6 Dec 2024 21:57:18 +0100 Subject: [PATCH 05/10] remove default send_raw_transaction implementation --- crates/optimism/rpc/src/eth/transaction.rs | 5 --- .../rpc-eth-api/src/helpers/transaction.rs | 41 ++++--------------- crates/rpc/rpc/src/eth/core.rs | 5 ++- crates/rpc/rpc/src/eth/helpers/transaction.rs | 32 +++++++++++---- 4 files changed, 36 insertions(+), 47 deletions(-) diff --git a/crates/optimism/rpc/src/eth/transaction.rs b/crates/optimism/rpc/src/eth/transaction.rs index 542e40cf73fc..c1e0a7301984 100644 --- a/crates/optimism/rpc/src/eth/transaction.rs +++ b/crates/optimism/rpc/src/eth/transaction.rs @@ -53,11 +53,6 @@ where Ok(hash) } - - #[inline] - fn broadcast_raw_transaction(&self, raw_tx: Bytes) { - self.inner.eth_api.broadcast_raw_transaction(raw_tx) - } } impl LoadTransaction for OpEthApi diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 64bb56033e19..f34329b5031c 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -17,10 +17,7 @@ use reth_provider::{ BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider, TransactionsProvider, }; -use reth_rpc_eth_types::{ - utils::{binary_search, recover_raw_transaction}, - EthApiError, SignError, TransactionSource, -}; +use reth_rpc_eth_types::{utils::binary_search, EthApiError, SignError, TransactionSource}; use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context}; use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; use std::sync::Arc; @@ -63,9 +60,13 @@ pub trait EthTransactions: LoadTransaction { #[expect(clippy::type_complexity)] fn signers(&self) -> &parking_lot::RwLock>>>>; - /// Broadcasts raw transaction to subscribed observers. Used internally by - /// `send_raw_transaction` to emit transactions as they are processed. - fn broadcast_raw_transaction(&self, raw_tx: Bytes); + /// Decodes and recovers the transaction and submits it to the pool. + /// + /// Returns the hash of the transaction. + fn send_raw_transaction( + &self, + tx: Bytes, + ) -> impl Future> + Send; /// Returns the transaction by hash. /// @@ -339,32 +340,6 @@ pub trait EthTransactions: LoadTransaction { } } - /// Decodes and recovers the transaction and submits it to the pool. - /// - /// Returns the hash of the transaction. - fn send_raw_transaction( - &self, - tx: Bytes, - ) -> impl Future> + Send { - async move { - let recovered = recover_raw_transaction(tx.clone())?; - let pool_transaction = - ::Transaction::from_pooled(recovered); - - // submit the transaction to the pool with a `Local` origin - let hash = self - .pool() - .add_transaction(TransactionOrigin::Local, pool_transaction) - .await - .map_err(Self::Error::from_eth_err)?; - - // broadcast raw transaction to subscribers if there is any. - self.broadcast_raw_transaction(tx); - - Ok(hash) - } - } - /// Signs transaction with a matching signer, if any and submits the transaction to the pool. /// Returns the hash of the signed transaction. fn send_transaction( diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index f568d6083f0b..58ef8c7e5aa1 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -27,6 +27,7 @@ use reth_tasks::{ TaskSpawner, TokioTaskExecutor, }; use tokio::sync::{broadcast, Mutex}; +use tracing::trace; use crate::eth::EthTxBuilder; @@ -446,8 +447,8 @@ where /// Broadcasts raw transaction if there are active subscribers. #[inline] pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { - if self.raw_tx_sender.receiver_count() > 0 { - let _ = self.raw_tx_sender.send(raw_tx); + if self.raw_tx_sender.send(raw_tx).is_err() { + trace!("no receivers for raw transaction"); } } } diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index f6cdf3fe7b75..fc6b6174ec03 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -1,13 +1,16 @@ //! Contains RPC handler implementations specific to transactions use crate::EthApi; -use alloy_primitives::Bytes; -use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider}; +use alloy_primitives::{Bytes, B256}; +use reth_provider::{ + BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider, +}; use reth_rpc_eth_api::{ helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, - FullEthApiTypes, RpcNodeCoreExt, + FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt, }; -use reth_transaction_pool::TransactionPool; +use reth_rpc_eth_types::utils::recover_raw_transaction; +use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; impl EthTransactions for EthApi @@ -20,9 +23,24 @@ where self.inner.signers() } - #[inline] - fn broadcast_raw_transaction(&self, raw_tx: Bytes) { - self.inner.broadcast_raw_transaction(raw_tx) + /// Decodes and recovers the transaction and submits it to the pool. + /// + /// Returns the hash of the transaction. + async fn send_raw_transaction(&self, tx: Bytes) -> Result { + let recovered = recover_raw_transaction(tx.clone())?; + let pool_transaction = ::Transaction::from_pooled(recovered); + + // submit the transaction to the pool with a `Local` origin + let hash = self + .pool() + .add_transaction(TransactionOrigin::Local, pool_transaction) + .await + .map_err(Self::Error::from_eth_err)?; + + // broadcast raw transaction to subscribers if there is any. + self.broadcast_raw_transaction(tx); + + Ok(hash) } } From eee16c6f6f33cb63ca22f9b273d55b8fc5b17044 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Fri, 6 Dec 2024 22:05:36 +0100 Subject: [PATCH 06/10] cargo fmt --all --- crates/rpc/rpc/src/eth/helpers/transaction.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index fc6b6174ec03..29700fd6b33d 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -2,9 +2,7 @@ use crate::EthApi; use alloy_primitives::{Bytes, B256}; -use reth_provider::{ - BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider, -}; +use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider}; use reth_rpc_eth_api::{ helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt, From b7bb9a3868b6eee09777b2e5e655ceabe002f560 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Fri, 6 Dec 2024 22:15:48 +0100 Subject: [PATCH 07/10] use slice --- crates/rpc/rpc/src/eth/helpers/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 29700fd6b33d..835c457e716a 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -25,7 +25,7 @@ where /// /// Returns the hash of the transaction. async fn send_raw_transaction(&self, tx: Bytes) -> Result { - let recovered = recover_raw_transaction(tx.clone())?; + let recovered = recover_raw_transaction(&tx)?; let pool_transaction = ::Transaction::from_pooled(recovered); // submit the transaction to the pool with a `Local` origin From c5051f202e7fe45c244b3c2fca80f2f91a7dd9e1 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Fri, 6 Dec 2024 22:19:34 +0100 Subject: [PATCH 08/10] cargo fmt --all --- crates/rpc/rpc-eth-api/src/helpers/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs index 0c29e472afe0..f34329b5031c 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/transaction.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/transaction.rs @@ -339,7 +339,7 @@ pub trait EthTransactions: LoadTransaction { Ok(None) } } - + /// Signs transaction with a matching signer, if any and submits the transaction to the pool. /// Returns the hash of the signed transaction. fn send_transaction( From 1b66059e10acc0b9a6465a56f57b57da9436d662 Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Wed, 11 Dec 2024 13:22:30 +0100 Subject: [PATCH 09/10] ::nit --- crates/rpc/rpc/src/eth/core.rs | 4 +--- crates/rpc/rpc/src/eth/helpers/transaction.rs | 7 ++++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 58ef8c7e5aa1..ff4d99328cb8 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -447,9 +447,7 @@ where /// Broadcasts raw transaction if there are active subscribers. #[inline] pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { - if self.raw_tx_sender.send(raw_tx).is_err() { - trace!("no receivers for raw transaction"); - } + let _ = self.raw_tx_sender.send(raw_tx); } } diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 835c457e716a..3fcdfb3f0681 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -26,6 +26,10 @@ where /// Returns the hash of the transaction. async fn send_raw_transaction(&self, tx: Bytes) -> Result { let recovered = recover_raw_transaction(&tx)?; + + // broadcast raw transaction to subscribers if there is any. + self.broadcast_raw_transaction(tx); + let pool_transaction = ::Transaction::from_pooled(recovered); // submit the transaction to the pool with a `Local` origin @@ -35,9 +39,6 @@ where .await .map_err(Self::Error::from_eth_err)?; - // broadcast raw transaction to subscribers if there is any. - self.broadcast_raw_transaction(tx); - Ok(hash) } } From 12cb8a915547858a12ab98bfe05c2013dc0a46db Mon Sep 17 00:00:00 2001 From: Ayodeji Akinola Date: Wed, 11 Dec 2024 14:40:05 +0100 Subject: [PATCH 10/10] fix: remove unused import --- crates/rpc/rpc/src/eth/core.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index ff4d99328cb8..0c8f06ba2b83 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -27,7 +27,6 @@ use reth_tasks::{ TaskSpawner, TokioTaskExecutor, }; use tokio::sync::{broadcast, Mutex}; -use tracing::trace; use crate::eth::EthTxBuilder;