Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: on-disk reorg E2E test #12977

Merged
merged 7 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions crates/e2e-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn setup<N>(
chain_spec: Arc<N::ChainSpec>,
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadBuilderAttributes + Copy + 'static,
) -> eyre::Result<(Vec<NodeHelperType<N, N::AddOns>>, TaskManager, Wallet)>
) -> eyre::Result<(Vec<NodeHelperType<N>>, TaskManager, Wallet)>
where
N: Default + Node<TmpNodeAdapter<N>> + NodeTypesForTree + NodeTypesWithEngine,
N::ComponentsBuilder: NodeComponentsBuilder<
Expand Down Expand Up @@ -115,7 +115,7 @@ pub async fn setup_engine<N>(
is_dev: bool,
attributes_generator: impl Fn(u64) -> <<N as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadBuilderAttributes + Copy + 'static,
) -> eyre::Result<(
Vec<NodeHelperType<N, N::AddOns, BlockchainProvider2<NodeTypesWithDBAdapter<N, TmpDB>>>>,
Vec<NodeHelperType<N, BlockchainProvider2<NodeTypesWithDBAdapter<N, TmpDB>>>>,
TaskManager,
Wallet,
)>
Expand Down Expand Up @@ -183,6 +183,9 @@ where

let mut node = NodeTestContext::new(node, attributes_generator).await?;

let genesis = node.block_hash(0);
node.engine_api.update_forkchoice(genesis, genesis).await?;

// Connect each node in a chain.
if let Some(previous_node) = nodes.last_mut() {
previous_node.connect(&mut node).await;
Expand All @@ -203,7 +206,8 @@ where

// Type aliases

type TmpDB = Arc<TempDatabase<DatabaseEnv>>;
/// Testing database
pub type TmpDB = Arc<TempDatabase<DatabaseEnv>>;
type TmpNodeAdapter<N, Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>> =
FullNodeTypesAdapter<NodeTypesWithDBAdapter<N, TmpDB>, Provider>;

Expand All @@ -216,5 +220,5 @@ pub type Adapter<N, Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, TmpD
>;

/// Type alias for a type of `NodeHelper`
pub type NodeHelperType<N, AO, Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>> =
NodeTestContext<Adapter<N, Provider>, AO>;
pub type NodeHelperType<N, Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, TmpDB>>> =
NodeTestContext<Adapter<N, Provider>, <N as Node<TmpNodeAdapter<N, Provider>>>::AddOns>;
60 changes: 55 additions & 5 deletions crates/e2e-test-utils/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
rpc::RpcTestContext, traits::PayloadEnvelopeExt,
};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockId;
use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
use alloy_rpc_types_engine::PayloadStatusEnum;
use alloy_rpc_types_eth::BlockNumberOrTag;
Expand Down Expand Up @@ -134,8 +135,8 @@ where
Ok((self.payload.expect_built_payload().await?, eth_attr))
}

/// Advances the node forward one block
pub async fn advance_block(
/// Triggers payload building job and submits it to the engine.
pub async fn build_and_submit_payload(
&mut self,
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
where
Expand All @@ -146,13 +147,27 @@ where
{
let (payload, eth_attr) = self.new_payload().await?;

let block_hash = self
.engine_api
self.engine_api
.submit_payload(payload.clone(), eth_attr.clone(), PayloadStatusEnum::Valid)
.await?;

Ok((payload, eth_attr))
}

/// Advances the node forward one block
pub async fn advance_block(
&mut self,
) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
where
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
<Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
From<Engine::BuiltPayload> + PayloadEnvelopeExt,
{
let (payload, eth_attr) = self.build_and_submit_payload().await?;

// trigger forkchoice update via engine api to commit the block to the blockchain
self.engine_api.update_forkchoice(block_hash, block_hash).await?;
self.engine_api.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;

Ok((payload, eth_attr))
}
Expand Down Expand Up @@ -238,6 +253,41 @@ where
Ok(())
}

/// Gets block hash by number.
pub fn block_hash(&self, number: u64) -> BlockHash {
self.inner
.provider
.sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
.unwrap()
.unwrap()
.hash()
}

/// Sends FCU and waits for the node to sync to the given block.
pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
self.engine_api.update_forkchoice(block, block).await?;

let start = std::time::Instant::now();

while self
.inner
.provider
.sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
.is_none_or(|h| h.hash() != block)
{
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

assert!(start.elapsed() <= std::time::Duration::from_secs(10), "timed out");
}

// Hack to make sure that all components have time to process canonical state update.
// Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
// making tests flaky.
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

Ok(())
}

/// Returns the RPC URL.
pub fn rpc_url(&self) -> Url {
let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
Expand Down
143 changes: 54 additions & 89 deletions crates/ethereum/node/tests/e2e/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
use crate::utils::eth_payload_attributes;
use alloy_consensus::TxType;
use alloy_primitives::bytes;
use alloy_provider::{
network::{
Ethereum, EthereumWallet, NetworkWallet, TransactionBuilder, TransactionBuilder7702,
},
Provider, ProviderBuilder, SendableTx,
};
use alloy_rpc_types_eth::TransactionRequest;
use alloy_signer::SignerSync;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use crate::utils::{advance_with_random_transactions, eth_payload_attributes};
use alloy_provider::{Provider, ProviderBuilder};
use rand::{rngs::StdRng, Rng, SeedableRng};
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_e2e_test_utils::{setup, setup_engine, transaction::TransactionTestContext};
use reth_node_ethereum::EthereumNode;
use revm::primitives::{AccessListItem, Authorization};
use std::sync::Arc;

#[tokio::test]
Expand Down Expand Up @@ -76,80 +66,12 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {
.build(),
);

let (mut nodes, _tasks, wallet) =
let (mut nodes, _tasks, _) =
setup_engine::<EthereumNode>(2, chain_spec.clone(), false, eth_payload_attributes).await?;
let mut node = nodes.pop().unwrap();
let signers = wallet.gen();
let provider = ProviderBuilder::new().with_recommended_fillers().on_http(node.rpc_url());

// simple contract which writes to storage on any call
let dummy_bytecode = bytes!("6080604052348015600f57600080fd5b50602880601d6000396000f3fe4360a09081523360c0526040608081905260e08152902080805500fea164736f6c6343000810000a");
let mut call_destinations = signers.iter().map(|s| s.address()).collect::<Vec<_>>();

// Produce 100 random blocks with random transactions
for _ in 0..100 {
let tx_count = rng.gen_range(1..20);

let mut pending = vec![];
for _ in 0..tx_count {
let signer = signers.choose(&mut rng).unwrap();
let tx_type = TxType::try_from(rng.gen_range(0..=4)).unwrap();

let mut tx = TransactionRequest::default().with_from(signer.address());

let should_create =
rng.gen::<bool>() && tx_type != TxType::Eip4844 && tx_type != TxType::Eip7702;
if should_create {
tx = tx.into_create().with_input(dummy_bytecode.clone());
} else {
tx = tx.with_to(*call_destinations.choose(&mut rng).unwrap()).with_input(
(0..rng.gen_range(0..10000)).map(|_| rng.gen()).collect::<Vec<u8>>(),
);
}

if matches!(tx_type, TxType::Legacy | TxType::Eip2930) {
tx = tx.with_gas_price(provider.get_gas_price().await?);
}

if rng.gen::<bool>() || tx_type == TxType::Eip2930 {
tx = tx.with_access_list(
vec![AccessListItem {
address: *call_destinations.choose(&mut rng).unwrap(),
storage_keys: (0..rng.gen_range(0..100)).map(|_| rng.gen()).collect(),
}]
.into(),
);
}

if tx_type == TxType::Eip7702 {
let signer = signers.choose(&mut rng).unwrap();
let auth = Authorization {
chain_id: provider.get_chain_id().await?,
address: *call_destinations.choose(&mut rng).unwrap(),
nonce: provider.get_transaction_count(signer.address()).await?,
};
let sig = signer.sign_hash_sync(&auth.signature_hash())?;
tx = tx.with_authorization_list(vec![auth.into_signed(sig)])
}

let SendableTx::Builder(tx) = provider.fill(tx).await? else { unreachable!() };
let tx =
NetworkWallet::<Ethereum>::sign_request(&EthereumWallet::new(signer.clone()), tx)
.await?;

pending.push(provider.send_tx_envelope(tx).await?);
}

let (payload, _) = node.advance_block().await?;
assert!(payload.block().raw_transactions().len() == tx_count);

for pending in pending {
let receipt = pending.get_receipt().await?;
if let Some(address) = receipt.contract_address {
call_destinations.push(address);
}
}
}
advance_with_random_transactions(&mut node, 100, &mut rng, true).await?;

let second_node = nodes.pop().unwrap();
let second_provider =
Expand All @@ -159,15 +81,58 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {

let head =
provider.get_block_by_number(Default::default(), false.into()).await?.unwrap().header.hash;
second_node.engine_api.update_forkchoice(head, head).await?;

let start = std::time::Instant::now();
second_node.sync_to(head).await?;

while provider.get_block_number().await? != second_provider.get_block_number().await? {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(())
}

#[tokio::test]
async fn test_long_reorg() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let seed: [u8; 32] = rand::thread_rng().gen();
let mut rng = StdRng::from_seed(seed);
println!("Seed: {:?}", seed);

let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(serde_json::from_str(include_str!("../assets/genesis.json")).unwrap())
.cancun_activated()
.prague_activated()
.build(),
);

let (mut nodes, _tasks, _) =
setup_engine::<EthereumNode>(2, chain_spec.clone(), false, eth_payload_attributes).await?;

let mut first_node = nodes.pop().unwrap();
let mut second_node = nodes.pop().unwrap();

let first_provider = ProviderBuilder::new().on_http(first_node.rpc_url());

// Advance first node 100 blocks.
advance_with_random_transactions(&mut first_node, 100, &mut rng, false).await?;

// Sync second node to 20th block.
let head = first_provider.get_block_by_number(20.into(), false.into()).await?.unwrap();
second_node.sync_to(head.header.hash).await?;

// Produce a fork chain with blocks 21.60
second_node.payload.timestamp = head.header.timestamp;
advance_with_random_transactions(&mut second_node, 40, &mut rng, true).await?;

// Reorg first node from 100th block to new 60th block.
first_node.sync_to(second_node.block_hash(60)).await?;

// Advance second node 20 blocks and ensure that first node is able to follow it.
advance_with_random_transactions(&mut second_node, 20, &mut rng, true).await?;
first_node.sync_to(second_node.block_hash(80)).await?;

assert!(start.elapsed() <= std::time::Duration::from_secs(10), "timed out");
}
// Ensure that it works the other way around too.
advance_with_random_transactions(&mut first_node, 20, &mut rng, true).await?;
second_node.sync_to(first_node.block_hash(100)).await?;

Ok(())
}
Loading
Loading