Skip to content

Commit

Permalink
fix run loop definition and test
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Jul 12, 2024
1 parent 95bff44 commit 0b100ee
Showing 1 changed file with 55 additions and 50 deletions.
105 changes: 55 additions & 50 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,44 +326,39 @@ where
}

fn run(mut self) {
loop {
while let Ok(msg) = self.incoming.recv() {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncFinished => {
todo!()
}
FromOrchestrator::BackfillSyncStarted => {
todo!()
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into))
{
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(
e,
))
})) {
error!("Failed to send event: {err:?}");
}
while let Ok(msg) = self.incoming.recv() {
match msg {
FromEngine::Event(event) => match event {
FromOrchestrator::BackfillSyncFinished => {
todo!()
}
FromOrchestrator::BackfillSyncStarted => {
todo!()
}
},
FromEngine::Request(request) => match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let output = self.on_forkchoice_updated(state, payload_attrs);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) {
error!("Failed to send event: {err:?}");
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
todo!()
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
let output = self.on_new_payload(payload, cancun_fields);
if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(|e| {
reth_beacon_consensus::BeaconOnNewPayloadError::Internal(Box::new(e))
})) {
error!("Failed to send event: {err:?}");
}
},
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
error!("Failed to send event: {err:?}");
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
todo!()
}
},
FromEngine::DownloadedBlocks(blocks) => {
if let Some(event) = self.on_downloaded(blocks) {
if let Err(err) = self.outgoing.send(EngineApiEvent::FromTree(event)) {
error!("Failed to send event: {err:?}");
}
}
}
Expand Down Expand Up @@ -719,7 +714,8 @@ where
type Engine = T;

fn on_downloaded(&mut self, _blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
todo!()
debug!("not implemented");
None
}

fn on_new_payload(
Expand Down Expand Up @@ -889,13 +885,16 @@ mod tests {
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::test_utils::MockExecutorProvider;
use reth_provider::test_utils::MockEthProvider;
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::unbounded_channel;

fn get_default_tree(
persistence_handle: PersistenceHandle,
tree_state: TreeState,
) -> EngineApiTreeHandlerImpl<MockEthProvider, MockExecutorProvider, EthEngineTypes> {
) -> (
EngineApiTreeHandlerImpl<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
Sender<FromEngine<BeaconEngineMessage<EthEngineTypes>>>,
) {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
Expand All @@ -911,7 +910,7 @@ mod tests {

let payload_validator = ExecutionPayloadValidator::new(chain_spec);

let (_to_tree_tx, to_tree_rx) = channel();
let (to_tree_tx, to_tree_rx) = channel();
let (from_tree_tx, from_tree_rx) = unbounded_channel();

let engine_api_tree_state = EngineApiTreeState {
Expand All @@ -921,15 +920,18 @@ mod tests {
forkchoice_state_tracker: ForkchoiceStateTracker::default(),
};

EngineApiTreeHandlerImpl::new(
provider,
executor_factory,
consensus,
payload_validator,
to_tree_rx,
from_tree_tx,
engine_api_tree_state,
persistence_handle,
(
EngineApiTreeHandlerImpl::new(
provider,
executor_factory,
consensus,
payload_validator,
to_tree_rx,
from_tree_tx,
engine_api_tree_state,
persistence_handle,
),
to_tree_tx,
)
}

Expand All @@ -953,9 +955,12 @@ mod tests {
let (action_tx, action_rx) = channel();
let persistence_handle = PersistenceHandle::new(action_tx);

let tree = get_default_tree(persistence_handle, tree_state);
let (tree, to_tree_tx) = get_default_tree(persistence_handle, tree_state);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap();

// send a message to the tree
to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();

let received_action = action_rx.recv().expect("Failed to receive saved blocks");
if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action {
// only PERSISTENCE_THRESHOLD will be persisted
Expand Down

0 comments on commit 0b100ee

Please sign in to comment.