diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 17e784e14180..678502514d21 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -326,44 +326,39 @@ where } fn run(mut self) { - loop { - while let Ok(msg) = self.incoming.recv() { - match msg { - FromEngine::Event(event) => match event { - FromOrchestrator::BackfillSyncFinished => { - todo!() - } - FromOrchestrator::BackfillSyncStarted => { - todo!() - } - }, - FromEngine::Request(request) => match request { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - let output = self.on_forkchoice_updated(state, payload_attrs); - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) - { - error!("Failed to send event: {err:?}"); - } - } - BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { - let output = self.on_new_payload(payload, cancun_fields); - if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { - reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new( - e, - )) - })) { - error!("Failed to send event: {err:?}"); - } + while let Ok(msg) = self.incoming.recv() { + match msg { + FromEngine::Event(event) => match event { + FromOrchestrator::BackfillSyncFinished => { + todo!() + } + FromOrchestrator::BackfillSyncStarted => { + todo!() + } + }, + FromEngine::Request(request) => match request { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + let output = self.on_forkchoice_updated(state, payload_attrs); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) { + error!("Failed to send event: {err:?}"); } - BeaconEngineMessage::TransitionConfigurationExchanged => { - todo!() + } + BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { + let output = self.on_new_payload(payload, cancun_fields); + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| { + reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e)) + })) { + error!("Failed to send event: {err:?}"); } - }, - FromEngine::DownloadedBlocks(blocks) => { - if let Some(event) = self.on_downloaded(blocks) { - if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) { - error!("Failed to send event: {err:?}"); - } + } + BeaconEngineMessage::TransitionConfigurationExchanged => { + todo!() + } + }, + FromEngine::DownloadedBlocks(blocks) => { + if let Some(event) = self.on_downloaded(blocks) { + if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) { + error!("Failed to send event: {err:?}"); } } } @@ -719,7 +714,8 @@ where type Engine = T; fn on_downloaded(&mut self, _blocks: Vec) -> Option { - todo!() + debug!("not implemented"); + None } fn on_new_payload( @@ -889,13 +885,16 @@ mod tests { use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::test_utils::MockExecutorProvider; use reth_provider::test_utils::MockEthProvider; - use std::sync::mpsc::channel; + use std::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::unbounded_channel; fn get_default_tree( persistence_handle: PersistenceHandle, tree_state: TreeState, - ) -> EngineApiTreeHandlerImpl { + ) -> ( + EngineApiTreeHandlerImpl, + Sender>>, + ) { let chain_spec = Arc::new( ChainSpecBuilder::default() .chain(MAINNET.chain) @@ -911,7 +910,7 @@ mod tests { let payload_validator = ExecutionPayloadValidator::new(chain_spec); - let (_to_tree_tx, to_tree_rx) = channel(); + let (to_tree_tx, to_tree_rx) = channel(); let (from_tree_tx, from_tree_rx) = unbounded_channel(); let engine_api_tree_state = EngineApiTreeState { @@ -921,15 +920,18 @@ mod tests { forkchoice_state_tracker: ForkchoiceStateTracker::default(), }; - EngineApiTreeHandlerImpl::new( - provider, - executor_factory, - consensus, - payload_validator, - to_tree_rx, - from_tree_tx, - engine_api_tree_state, - persistence_handle, + ( + EngineApiTreeHandlerImpl::new( + provider, + executor_factory, + consensus, + payload_validator, + to_tree_rx, + from_tree_tx, + engine_api_tree_state, + persistence_handle, + ), + to_tree_tx, ) } @@ -953,9 +955,12 @@ mod tests { let (action_tx, action_rx) = channel(); let persistence_handle = PersistenceHandle::new(action_tx); - let tree = get_default_tree(persistence_handle, tree_state); + let (tree, to_tree_tx) = get_default_tree(persistence_handle, tree_state); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap(); + // send a message to the tree + to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); + let received_action = action_rx.recv().expect("Failed to receive saved blocks"); if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action { // only PERSISTENCE_THRESHOLD will be persisted