From 39c53823e4ccc6d75d37add7a934288833a32644 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 12 Jul 2024 18:16:00 -0400 Subject: [PATCH] feat: route commands to correct persistence service (#9435) Co-authored-by: Federico Gimenez Co-authored-by: Matthias Seitz --- crates/engine/tree/src/database.rs | 255 ++++++++++++++++++ crates/engine/tree/src/lib.rs | 6 +- crates/engine/tree/src/persistence.rs | 350 +++++++++---------------- crates/engine/tree/src/static_files.rs | 31 +-- crates/engine/tree/src/test_utils.rs | 2 +- crates/engine/tree/src/tree/mod.rs | 10 +- 6 files changed, 395 insertions(+), 259 deletions(-) create mode 100644 crates/engine/tree/src/database.rs diff --git a/crates/engine/tree/src/database.rs b/crates/engine/tree/src/database.rs new file mode 100644 index 000000000000..1dfcec70abf8 --- /dev/null +++ b/crates/engine/tree/src/database.rs @@ -0,0 +1,255 @@ +#![allow(dead_code)] + +use crate::{static_files::StaticFileServiceHandle, tree::ExecutedBlock}; +use reth_db::database::Database; +use reth_errors::ProviderResult; +use reth_primitives::B256; +use reth_provider::{ + bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown, + ProviderFactory, StageCheckpointWriter, StateWriter, +}; +use reth_prune::{PruneProgress, Pruner}; +use reth_stages_types::{StageCheckpoint, StageId}; +use std::sync::mpsc::{Receiver, SendError, Sender}; +use tokio::sync::oneshot; +use tracing::debug; + +/// Writes parts of reth's in memory tree state to the database. +/// +/// This is meant to be a spawned service that listens for various incoming database operations, +/// performing those actions on disk, and returning the result in a channel. +/// +/// There are two types of operations this service can perform: +/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted. +/// - Removing blocks from disk, returning the hash of the lowest block removed. +/// +/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs +/// blocking database operations in an endless loop. +#[derive(Debug)] +pub struct DatabaseService { + /// The db / static file provider to use + provider: ProviderFactory, + /// Incoming requests to persist stuff + incoming: Receiver, + /// Handle for the static file service. + static_file_handle: StaticFileServiceHandle, + /// The pruner + pruner: Pruner>, +} + +impl DatabaseService { + /// Create a new database service + pub const fn new( + provider: ProviderFactory, + incoming: Receiver, + static_file_handle: StaticFileServiceHandle, + pruner: Pruner>, + ) -> Self { + Self { provider, incoming, static_file_handle, pruner } + } + + /// Writes the cloned tree state to the database + fn write(&self, blocks: Vec) -> ProviderResult<()> { + let provider_rw = self.provider.provider_rw()?; + + if blocks.is_empty() { + debug!(target: "tree::persistence::db", "Attempted to write empty block range"); + return Ok(()) + } + + let first_number = blocks.first().unwrap().block().number; + + let last = blocks.last().unwrap().block(); + let last_block_number = last.number; + + // TODO: remove all the clones and do performant / batched writes for each type of object + // instead of a loop over all blocks, + // meaning: + // * blocks + // * state + // * hashed state + // * trie updates (cannot naively extend, need helper) + // * indices (already done basically) + // Insert the blocks + for block in blocks { + let sealed_block = + block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); + provider_rw.insert_block(sealed_block)?; + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + let execution_outcome = block.execution_outcome().clone(); + execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?; + + // insert hashes and intermediate merkle nodes + { + let trie_updates = block.trie_updates().clone(); + let hashed_state = block.hashed_state(); + HashedStateChanges(hashed_state.clone()).write_to_db(&provider_rw)?; + trie_updates.write_to_database(provider_rw.tx_ref())?; + } + + // update history indices + provider_rw.update_history_indices(first_number..=last_block_number)?; + + // Update pipeline progress + provider_rw.update_pipeline_stages(last_block_number, false)?; + } + + debug!(target: "tree::persistence::db", range = ?first_number..=last_block_number, "Appended blocks"); + + Ok(()) + } + + /// Removes block data above the given block number from the database. + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// Returns the block hash for the lowest block removed from the database, which should be + /// the hash for `block_number + 1`. + /// + /// This will then send a command to the static file service, to remove the actual block data. + fn remove_blocks_above(&self, block_number: u64) -> ProviderResult { + todo!("depends on PR") + // let mut provider_rw = self.provider.provider_rw()?; + // provider_rw.get_or_take_block_and_execution_range(range); + } + + /// Prunes block data before the given block hash according to the configured prune + /// configuration. + fn prune_before(&mut self, block_num: u64) -> PruneProgress { + // TODO: doing this properly depends on pruner segment changes + self.pruner.run(block_num).expect("todo: handle errors") + } + + /// Updates checkpoints related to block headers and bodies. This should be called by the static + /// file service, after new transactions have been successfully written to disk. + fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { + let provider_rw = self.provider.provider_rw()?; + provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; + provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; + provider_rw.commit()?; + Ok(()) + } +} + +impl DatabaseService +where + DB: Database, +{ + /// This is the main loop, that will listen to database events and perform the requested + /// database actions + pub fn run(mut self) { + // If the receiver errors then senders have disconnected, so the loop should then end. + while let Ok(action) = self.incoming.recv() { + match action { + DatabaseAction::RemoveBlocksAbove((new_tip_num, sender)) => { + let output = + self.remove_blocks_above(new_tip_num).expect("todo: handle errors"); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(output); + } + DatabaseAction::SaveBlocks((blocks, sender)) => { + if blocks.is_empty() { + todo!("return error or something"); + } + let last_block_hash = blocks.last().unwrap().block().hash(); + self.write(blocks).unwrap(); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(last_block_hash); + } + DatabaseAction::PruneBefore((block_num, sender)) => { + let res = self.prune_before(block_num); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(res); + } + DatabaseAction::UpdateTransactionMeta((block_num, sender)) => { + self.update_transaction_meta(block_num).expect("todo: handle errors"); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(()); + } + } + } + } +} + +/// A signal to the database service that part of the tree state can be persisted. +#[derive(Debug)] +pub enum DatabaseAction { + /// The section of tree state that should be persisted. These blocks are expected in order of + /// increasing block number. + /// + /// This should just store the execution history-related data. Header, transaction, and + /// receipt-related data should already be written to static files. + SaveBlocks((Vec, oneshot::Sender)), + + /// Updates checkpoints related to block headers and bodies. This should be called by the + /// static file service, after new transactions have been successfully written to disk. + UpdateTransactionMeta((u64, oneshot::Sender<()>)), + + /// Removes block data above the given block number from the database. + /// + /// This will then send a command to the static file service, to remove the actual block data. + /// + /// Returns the block hash for the lowest block removed from the database. + RemoveBlocksAbove((u64, oneshot::Sender)), + + /// Prune associated block data before the given block number, according to already-configured + /// prune modes. + PruneBefore((u64, oneshot::Sender)), +} + +/// A handle to the database service +#[derive(Debug, Clone)] +pub struct DatabaseServiceHandle { + /// The channel used to communicate with the database service + sender: Sender, +} + +impl DatabaseServiceHandle { + /// Create a new [`DatabaseServiceHandle`] from a [`Sender`]. + pub const fn new(sender: Sender) -> Self { + Self { sender } + } + + /// Sends a specific [`DatabaseAction`] in the contained channel. The caller is responsible + /// for creating any channels for the given action. + pub fn send_action(&self, action: DatabaseAction) -> Result<(), SendError> { + self.sender.send(action) + } + + /// Tells the database service to save a certain list of finalized blocks. The blocks are + /// assumed to be ordered by block number. + /// + /// This returns the latest hash that has been saved, allowing removal of that block and any + /// previous blocks from in-memory data structures. + pub async fn save_blocks(&self, blocks: Vec) -> B256 { + let (tx, rx) = oneshot::channel(); + self.sender.send(DatabaseAction::SaveBlocks((blocks, tx))).expect("should be able to send"); + rx.await.expect("todo: err handling") + } + + /// Tells the database service to remove blocks above a certain block number. The removed + /// blocks are returned by the service. + pub async fn remove_blocks_above(&self, block_num: u64) -> B256 { + let (tx, rx) = oneshot::channel(); + self.sender + .send(DatabaseAction::RemoveBlocksAbove((block_num, tx))) + .expect("should be able to send"); + rx.await.expect("todo: err handling") + } + + /// Tells the database service to remove block data before the given hash, according to the + /// configured prune config. + pub async fn prune_before(&self, block_num: u64) -> PruneProgress { + let (tx, rx) = oneshot::channel(); + self.sender + .send(DatabaseAction::PruneBefore((block_num, tx))) + .expect("should be able to send"); + rx.await.expect("todo: err handling") + } +} diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index 6540269f7d64..b4ac74992c21 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -20,15 +20,17 @@ pub use reth_blockchain_tree_api::*; pub mod backfill; /// The type that drives the chain forward. pub mod chain; +/// The background writer service for batch db writes. +pub mod database; /// Support for downloading blocks on demand for live sync. pub mod download; /// Engine Api chain handler support. pub mod engine; /// Metrics support. pub mod metrics; -/// The background writer task for batch db writes. +/// The background writer service, coordinating the static file and database services. pub mod persistence; -/// The background writer task for static file writes. +/// The background writer service for static file writes. pub mod static_files; /// Support for interacting with the blockchain tree. pub mod tree; diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index f63003a31ba3..a51906e78f2a 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,206 +1,27 @@ #![allow(dead_code)] -use crate::{static_files::StaticFileServiceHandle, tree::ExecutedBlock}; -use reth_db::database::Database; -use reth_errors::ProviderResult; -use reth_primitives::B256; -use reth_provider::{ - bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown, - ProviderFactory, StageCheckpointWriter, StateWriter, +use crate::{ + database::{DatabaseAction, DatabaseService, DatabaseServiceHandle}, + static_files::{StaticFileAction, StaticFileService, StaticFileServiceHandle}, + tree::ExecutedBlock, }; +use reth_db::Database; +use reth_primitives::{SealedBlock, B256, U256}; +use reth_provider::ProviderFactory; use reth_prune::{PruneProgress, Pruner}; -use reth_stages_types::{StageCheckpoint, StageId}; -use std::sync::mpsc::{Receiver, SendError, Sender}; +use std::sync::{ + mpsc::{SendError, Sender}, + Arc, +}; use tokio::sync::oneshot; -use tracing::debug; - -/// Writes parts of reth's in memory tree state to the database. -/// -/// This is meant to be a spawned service that listens for various incoming persistence operations, -/// performing those actions on disk, and returning the result in a channel. -/// -/// There are two types of operations this service can perform: -/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted. -/// - Removing blocks from disk, returning the removed blocks. -/// -/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs -/// blocking database operations in an endless loop. -#[derive(Debug)] -pub struct Persistence { - /// The db / static file provider to use - provider: ProviderFactory, - /// Incoming requests to persist stuff - incoming: Receiver, - /// Handle for the static file service. - static_file_handle: StaticFileServiceHandle, - /// The pruner - pruner: Pruner>, -} - -impl Persistence { - /// Create a new persistence service - const fn new( - provider: ProviderFactory, - incoming: Receiver, - static_file_handle: StaticFileServiceHandle, - pruner: Pruner>, - ) -> Self { - Self { provider, incoming, static_file_handle, pruner } - } - - /// Writes the cloned tree state to the database - fn write(&self, blocks: Vec) -> ProviderResult<()> { - let provider_rw = self.provider.provider_rw()?; - - if blocks.is_empty() { - debug!(target: "tree::persistence", "Attempted to write empty block range"); - return Ok(()) - } - - let first_number = blocks.first().unwrap().block().number; - - let last = blocks.last().unwrap().block(); - let last_block_number = last.number; - - // TODO: remove all the clones and do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for block in blocks { - let sealed_block = - block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); - provider_rw.insert_block(sealed_block)?; - - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - let execution_outcome = block.execution_outcome().clone(); - execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?; - - // insert hashes and intermediate merkle nodes - { - let trie_updates = block.trie_updates().clone(); - let hashed_state = block.hashed_state(); - HashedStateChanges(hashed_state.clone()).write_to_db(&provider_rw)?; - trie_updates.write_to_database(provider_rw.tx_ref())?; - } - - // update history indices - provider_rw.update_history_indices(first_number..=last_block_number)?; - - // Update pipeline progress - provider_rw.update_pipeline_stages(last_block_number, false)?; - } - - debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended blocks"); - - Ok(()) - } - - /// Removes block data above the given block number from the database. - /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove - /// `block_number`. - /// - /// Returns the block hash for the lowest block removed from the database, which should be - /// the hash for `block_number + 1`. - /// - /// This will then send a command to the static file service, to remove the actual block data. - fn remove_blocks_above(&self, block_number: u64) -> ProviderResult { - todo!("depends on PR") - // let mut provider_rw = self.provider.provider_rw()?; - // provider_rw.get_or_take_block_and_execution_range(range); - } - - /// Prunes block data before the given block hash according to the configured prune - /// configuration. - fn prune_before(&mut self, block_num: u64) -> PruneProgress { - // TODO: doing this properly depends on pruner segment changes - self.pruner.run(block_num).expect("todo: handle errors") - } - - /// Updates checkpoints related to block headers and bodies. This should be called by the static - /// file service, after new transactions have been successfully written to disk. - fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { - let provider_rw = self.provider.provider_rw()?; - provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; - provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; - provider_rw.commit()?; - Ok(()) - } -} - -impl Persistence -where - DB: Database + 'static, -{ - /// Create a new persistence service, spawning it, and returning a [`PersistenceHandle`]. - fn spawn_new( - provider: ProviderFactory, - static_file_handle: StaticFileServiceHandle, - pruner: Pruner>, - ) -> PersistenceHandle { - let (tx, rx) = std::sync::mpsc::channel(); - let service = Self::new(provider, rx, static_file_handle, pruner); - std::thread::Builder::new() - .name("Persistence Service".to_string()) - .spawn(|| service.run()) - .unwrap(); - - PersistenceHandle::new(tx) - } -} - -impl Persistence -where - DB: Database, -{ - /// This is the main loop, that will listen to persistence events and perform the requested - /// database actions - fn run(mut self) { - // If the receiver errors then senders have disconnected, so the loop should then end. - while let Ok(action) = self.incoming.recv() { - match action { - PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => { - let output = - self.remove_blocks_above(new_tip_num).expect("todo: handle errors"); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(output); - } - PersistenceAction::SaveBlocks((blocks, sender)) => { - if blocks.is_empty() { - todo!("return error or something"); - } - let last_block_hash = blocks.last().unwrap().block().hash(); - self.write(blocks).unwrap(); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(last_block_hash); - } - PersistenceAction::PruneBefore((block_num, sender)) => { - let res = self.prune_before(block_num); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(res); - } - PersistenceAction::UpdateTransactionMeta((block_num, sender)) => { - self.update_transaction_meta(block_num).expect("todo: handle errors"); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(()); - } - } - } - } -} -/// A signal to the persistence service that part of the tree state can be persisted. +/// A signal to the database and static file services that part of the tree state can be persisted. #[derive(Debug)] pub enum PersistenceAction { + /// The given block has been added to the canonical chain, its transactions and headers will be + /// persisted for durability. + LogTransactions((Arc, u64, U256, oneshot::Sender<()>)), + /// The section of tree state that should be persisted. These blocks are expected in order of /// increasing block number. /// @@ -208,14 +29,8 @@ pub enum PersistenceAction { /// receipt-related data should already be written to static files. SaveBlocks((Vec, oneshot::Sender)), - /// Updates checkpoints related to block headers and bodies. This should be called by the - /// static file service, after new transactions have been successfully written to disk. - UpdateTransactionMeta((u64, oneshot::Sender<()>)), - /// Removes block data above the given block number from the database. /// - /// This will then send a command to the static file service, to remove the actual block data. - /// /// Returns the block hash for the lowest block removed from the database. RemoveBlocksAbove((u64, oneshot::Sender)), @@ -224,26 +39,111 @@ pub enum PersistenceAction { PruneBefore((u64, oneshot::Sender)), } -/// A handle to the persistence service +/// An error type for when there is a [`SendError`] while sending an action to one of the services. #[derive(Debug)] +pub enum PersistenceSendError { + /// When there is an error sending to the static file service + StaticFile(SendError), + /// When there is an error sending to the database service + Database(SendError), +} + +impl From> for PersistenceSendError { + fn from(value: SendError) -> Self { + Self::StaticFile(value) + } +} + +impl From> for PersistenceSendError { + fn from(value: SendError) -> Self { + Self::Database(value) + } +} + +/// A handle to the database and static file services. This will send commands to the correct +/// service, depending on the command. +/// +/// Some commands should be sent to the database service, and others should be sent to the static +/// file service, despite having the same name. This is because some actions require work to be done +/// by both the static file _and_ the database service, and require some coordination. +/// +/// This type is what actually coordinates the two services, and should be used by consumers of the +/// persistence related services. +#[derive(Debug, Clone)] pub struct PersistenceHandle { - /// The channel used to communicate with the persistence service - sender: Sender, + /// The channel used to communicate with the database service + db_sender: Sender, + /// The channel used to communicate with the static file service + static_file_sender: Sender, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. - pub const fn new(sender: Sender) -> Self { - Self { sender } + pub const fn new( + db_sender: Sender, + static_file_sender: Sender, + ) -> Self { + Self { db_sender, static_file_sender } + } + + /// Create a new [`PersistenceHandle`], and spawn the database and static file services. + pub fn spawn_services( + provider_factory: ProviderFactory, + pruner: Pruner>, + ) -> Self { + // create the initial channels + let (static_file_service_tx, static_file_service_rx) = std::sync::mpsc::channel(); + let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); + + // construct persistence handle + let persistence_handle = Self::new(db_service_tx.clone(), static_file_service_tx.clone()); + + // construct handles for the services to talk to each other + let static_file_handle = StaticFileServiceHandle::new(static_file_service_tx); + let database_handle = DatabaseServiceHandle::new(db_service_tx); + + // spawn the db service + let db_service = DatabaseService::new( + provider_factory.clone(), + db_service_rx, + static_file_handle, + pruner, + ); + std::thread::Builder::new() + .name("Database Service".to_string()) + .spawn(|| db_service.run()) + .unwrap(); + + // spawn the static file service + let static_file_service = + StaticFileService::new(provider_factory, static_file_service_rx, database_handle); + std::thread::Builder::new() + .name("Static File Service".to_string()) + .spawn(|| static_file_service.run()) + .unwrap(); + + persistence_handle } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible /// for creating any channels for the given action. - pub fn send_action( - &self, - action: PersistenceAction, - ) -> Result<(), SendError> { - self.sender.send(action) + pub fn send_action(&self, action: PersistenceAction) -> Result<(), PersistenceSendError> { + match action { + PersistenceAction::LogTransactions(input) => self + .static_file_sender + .send(StaticFileAction::LogTransactions(input)) + .map_err(From::from), + PersistenceAction::SaveBlocks(input) => self + .static_file_sender + .send(StaticFileAction::WriteExecutionData(input)) + .map_err(From::from), + PersistenceAction::RemoveBlocksAbove(input) => { + self.db_sender.send(DatabaseAction::RemoveBlocksAbove(input)).map_err(From::from) + } + PersistenceAction::PruneBefore(input) => { + self.db_sender.send(DatabaseAction::PruneBefore(input)).map_err(From::from) + } + } } /// Tells the persistence service to save a certain list of finalized blocks. The blocks are @@ -257,8 +157,7 @@ impl PersistenceHandle { let _ = tx.send(B256::default()); return; } - self.sender - .send(PersistenceAction::SaveBlocks((blocks, tx))) + self.send_action(PersistenceAction::SaveBlocks((blocks, tx))) .expect("should be able to send"); } @@ -266,8 +165,7 @@ impl PersistenceHandle { /// blocks are returned by the service. pub async fn remove_blocks_above(&self, block_num: u64) -> B256 { let (tx, rx) = oneshot::channel(); - self.sender - .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) + self.send_action(PersistenceAction::RemoveBlocksAbove((block_num, tx))) .expect("should be able to send"); rx.await.expect("todo: err handling") } @@ -276,8 +174,7 @@ impl PersistenceHandle { /// configured prune config. pub async fn prune_before(&self, block_num: u64) -> PruneProgress { let (tx, rx) = oneshot::channel(); - self.sender - .send(PersistenceAction::PruneBefore((block_num, tx))) + self.send_action(PersistenceAction::PruneBefore((block_num, tx))) .expect("should be able to send"); rx.await.expect("todo: err handling") } @@ -287,22 +184,14 @@ impl PersistenceHandle { mod tests { use super::*; use crate::test_utils::get_executed_block_with_number; - use reth_chainspec::MAINNET; - use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; use reth_exex_types::FinishedExExHeight; use reth_primitives::B256; - use reth_provider::{providers::StaticFileProvider, ProviderFactory}; + use reth_provider::{test_utils::create_test_provider_factory, ProviderFactory}; use reth_prune::Pruner; - use std::sync::mpsc::channel; fn default_persistence_handle() -> PersistenceHandle { - let db = create_test_rw_db(); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); - let provider = ProviderFactory::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ); + let provider = create_test_provider_factory(); + let (finished_exex_height_tx, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs); @@ -315,10 +204,7 @@ mod tests { finished_exex_height_rx, ); - let (static_file_sender, _static_file_receiver) = channel(); - let static_file_handle = StaticFileServiceHandle::new(static_file_sender); - - Persistence::spawn_new(provider, static_file_handle, pruner) + PersistenceHandle::spawn_services(provider, pruner) } #[tokio::test] diff --git a/crates/engine/tree/src/static_files.rs b/crates/engine/tree/src/static_files.rs index acc022a9ac28..09ca48ad0283 100644 --- a/crates/engine/tree/src/static_files.rs +++ b/crates/engine/tree/src/static_files.rs @@ -4,7 +4,6 @@ use reth_db::database::Database; use reth_errors::ProviderResult; use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; use reth_provider::{ProviderFactory, StaticFileProviderFactory, StaticFileWriter}; -use reth_prune::PruneModes; use std::sync::{ mpsc::{Receiver, SendError, Sender}, Arc, @@ -12,7 +11,7 @@ use std::sync::{ use tokio::sync::oneshot; use crate::{ - persistence::{PersistenceAction, PersistenceHandle}, + database::{DatabaseAction, DatabaseServiceHandle}, tree::ExecutedBlock, }; @@ -28,28 +27,22 @@ pub struct StaticFileService { /// The db / static file provider to use provider: ProviderFactory, /// Handle for the database service - database_handle: PersistenceHandle, + database_handle: DatabaseServiceHandle, /// Incoming requests to write static files incoming: Receiver, - /// The pruning configuration - pruning: PruneModes, } impl StaticFileService where DB: Database + 'static, { - /// Create a new static file service, spawning it, and returning a [`StaticFileServiceHandle`]. - fn spawn_new(provider: ProviderFactory) -> StaticFileServiceHandle { - todo!("implement initialization first"); - // let (tx, rx) = std::sync::mpsc::channel(); - // let service = Self::new(provider, rx); - // std::thread::Builder::new() - // .name("StaticFile Service".to_string()) - // .spawn(|| service.run()) - // .unwrap(); - - // StaticFileServiceHandle::new(tx) + /// Create a new static file service. + pub const fn new( + provider: ProviderFactory, + incoming: Receiver, + database_handle: DatabaseServiceHandle, + ) -> Self { + Self { provider, database_handle, incoming } } // TODO: some things about this are a bit weird, and just to make the underlying static file @@ -94,7 +87,7 @@ where // send a command to the db service to update the checkpoints for headers / bodies let _ = self .database_handle - .send_action(PersistenceAction::UpdateTransactionMeta((block.number, sender))); + .send_action(DatabaseAction::UpdateTransactionMeta((block.number, sender))); Ok(()) } @@ -133,7 +126,7 @@ where // TODO: do we care about the mpsc error here? // send a command to the db service to update the checkpoints for execution etc. - let _ = self.database_handle.send_action(PersistenceAction::SaveBlocks((blocks, sender))); + let _ = self.database_handle.send_action(DatabaseAction::SaveBlocks((blocks, sender))); Ok(()) } @@ -176,7 +169,7 @@ where { /// This is the main loop, that will listen to static file actions, and write DB data to static /// files. - fn run(self) { + pub fn run(self) { // If the receiver errors then senders have disconnected, so the loop should then end. while let Ok(action) = self.incoming.recv() { match action { diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs index 3ab7d31cdbaf..2d55fa276ea0 100644 --- a/crates/engine/tree/src/test_utils.rs +++ b/crates/engine/tree/src/test_utils.rs @@ -99,7 +99,7 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu Arc::new(sealed_with_senders.senders), Arc::new(ExecutionOutcome::new( BundleState::default(), - Receipts { receipt_vec: vec![] }, + Receipts { receipt_vec: vec![vec![]] }, block_number, vec![Requests::default()], )), diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 81b69dfb1842..386a58ed7b04 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -878,8 +878,7 @@ impl PersistenceState { #[cfg(test)] mod tests { use super::*; - use crate::{persistence::PersistenceAction, test_utils::get_executed_blocks}; - + use crate::{static_files::StaticFileAction, test_utils::get_executed_blocks}; use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_ethereum_engine_primitives::EthEngineTypes; @@ -954,7 +953,8 @@ mod tests { let tree_state = TreeState { blocks_by_hash, blocks_by_number }; let (action_tx, action_rx) = channel(); - let persistence_handle = PersistenceHandle::new(action_tx); + let (sf_action_tx, sf_action_rx) = channel(); + let persistence_handle = PersistenceHandle::new(action_tx, sf_action_tx); let (tree, to_tree_tx) = get_default_tree(persistence_handle, tree_state); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap(); @@ -962,8 +962,8 @@ mod tests { // send a message to the tree to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); - let received_action = action_rx.recv().expect("Failed to receive saved blocks"); - if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action { + let received_action = sf_action_rx.recv().expect("Failed to receive saved blocks"); + if let StaticFileAction::WriteExecutionData((saved_blocks, _)) = received_action { // only PERSISTENCE_THRESHOLD will be persisted blocks.pop(); assert_eq!(saved_blocks, blocks);