Skip to content

Commit

Permalink
back to the future
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jun 7, 2024
1 parent e085c91 commit b9fde18
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions examples/exex/in-memory-state/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::info;
use std::{
future::Future,
pin::{pin, Pin},
task::{Context, Poll},
pin::Pin,
task::{ready, Context, Poll},
};

/// An ExEx that keeps track of the entire state in memory
Expand All @@ -30,36 +30,32 @@ impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
type Output = eyre::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
pin!(self.get_mut().start()).poll(cx)
}
}
let this = self.get_mut();

impl<Node: FullNodeComponents> InMemoryStateExEx<Node> {
async fn start(&mut self) -> eyre::Result<()> {
while let Some(notification) = self.ctx.notifications.recv().await {
while let Some(notification) = ready!(this.ctx.notifications.poll_recv(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
// revert to block before the reorg
self.state.revert_to(new.first().number - 1);
this.state.revert_to(new.first().number - 1);
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
self.state.revert_to(old.first().number - 1);
this.state.revert_to(old.first().number - 1);
info!(reverted_chain = ?old.range(), "Received revert");
}
};

if let Some(committed_chain) = notification.committed_chain() {
// extend the state with the new chain
self.state.extend(committed_chain.state().clone());
self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
this.state.extend(committed_chain.state().clone());
this.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}

Ok(())
Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -90,27 +86,34 @@ mod tests {
#[tokio::test]
async fn test_exex() -> eyre::Result<()> {
let mut rng = &mut generators::rng();
let (ctx, handle) = test_exex_context().await?;

let (ctx, handle) = test_exex_context().await?;
let mut exex = pin!(super::InMemoryStateExEx::new(ctx));

let mut state = BundleStateWithReceipts::default();
let mut expected_state = BundleStateWithReceipts::default();

// Generate first block and its state
let block_1 = random_block(&mut rng, 0, None, Some(1), None)
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
let block_number_1 = block_1.number;
let state_1 = BundleStateWithReceipts::new(
BundleState::default(),
Receipts::from_block_receipt(vec![random_receipt(&mut rng, &block_1.body[0], None)]),
block_1.number,
);
state.extend(state_1.clone());
// Extend the expected state with the first block
expected_state.extend(state_1.clone());

// Send a notification to the Execution Extension that the chain with the first block has
// been committed
handle.send_notification_chain_committed(Chain::new(vec![block_1], state_1, None)).await?;
exex.poll_once().await?;

assert_eq!(exex.as_mut().state, state);
// Assert that the state of the first block has been added to the total state
assert_eq!(exex.as_mut().state, expected_state);

// Generate second block and its state
let block_2 = random_block(&mut rng, 1, None, Some(2), None)
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
Expand All @@ -119,12 +122,26 @@ mod tests {
Receipts::from_block_receipt(vec![random_receipt(&mut rng, &block_2.body[0], None)]),
block_2.number,
);
state.extend(state_2.clone());
// Extend the expected state with the second block
expected_state.extend(state_2.clone());

// Send a notification to the Execution Extension that the chain with the second block has
// been committed
let chain_2 = Chain::new(vec![block_2], state_2, None);
handle.send_notification_chain_committed(chain_2.clone()).await?;
exex.poll_once().await?;

// Assert that the state of the second block has been added to the total state
assert_eq!(exex.as_mut().state, expected_state);

handle.send_notification_chain_committed(Chain::new(vec![block_2], state_2, None)).await?;
// Send a notification to the Execution Extension that the chain with the second block has
// been reverted
handle.send_notification_chain_reverted(chain_2).await?;
exex.poll_once().await?;

assert_eq!(exex.as_mut().state, state);
// Assert that the state of the second block has been reverted
expected_state.revert_to(block_number_1);
assert_eq!(exex.as_mut().state, expected_state);

Ok(())
}
Expand Down

0 comments on commit b9fde18

Please sign in to comment.