From 6215f6d56ae4da239938e1ffe712aa4aa6cea70a Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Fri, 5 Jul 2024 12:19:30 +0200 Subject: [PATCH] Updated block importer to allow more blocks to be queue (#2010) Before, we required all services to process the import result before committing a new result. It was done to reduce the gap between on-chain and off-chain databases and minimize the damage from the https://github.com/FuelLabs/fuel-core/issues/1584 problem. The https://github.com/FuelLabs/fuel-core/issues/1584 is no longer a problem with https://github.com/FuelLabs/fuel-core/pull/2004 fix. Because of that, we can allow committing more blocks in parallel while other services are processing old ones. It improves synchronization speed because we have a buffer before we wait for other services to catch up. It is very actual for cases when other services are busy right now with other work, but soon will be available to process `ImportResult`. The default value size of the buffer is `1024`. ### Before requesting review - [x] I have reviewed the code myself --------- Co-authored-by: Hannes Karppila <2204863+Dentosal@users.noreply.github.com> --- CHANGELOG.md | 1 + Cargo.lock | 1 + crates/services/importer/Cargo.toml | 1 + crates/services/importer/src/config.rs | 2 - crates/services/importer/src/importer.rs | 120 ++++++++++------------- 5 files changed, 55 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85107662d8a..60724909308 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [#1948](https://github.com/FuelLabs/fuel-core/pull/1948): Add new `AlgorithmV1` and `AlgorithmUpdaterV1` for the gas price. Include tools for analysis ### Changed +- [#2010](https://github.com/FuelLabs/fuel-core/pull/2010): Updated the block importer to allow more blocks to be in the queue. It improves synchronization speed and mitigate the impact of other services on synchronization speed. - [#2006](https://github.com/FuelLabs/fuel-core/pull/2006): Process block importer events first under P2P pressure. - [#2002](https://github.com/FuelLabs/fuel-core/pull/2002): Adapted the block producer to react to checked transactions that were using another version of consensus parameters during validation in the TxPool. After an upgrade of the consensus parameters of the network, TxPool could store invalid `Checked` transactions. This change fixes that by tracking the version that was used to validate the transactions. - [#1999](https://github.com/FuelLabs/fuel-core/pull/1999): Minimize the number of panics in the codebase. diff --git a/Cargo.lock b/Cargo.lock index 63e43578362..22cd957957a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3060,6 +3060,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-types", "mockall", + "parking_lot", "test-case", "tokio", "tokio-rayon", diff --git a/crates/services/importer/Cargo.toml b/crates/services/importer/Cargo.toml index 9d6e6d457ad..b6dae64a88f 100644 --- a/crates/services/importer/Cargo.toml +++ b/crates/services/importer/Cargo.toml @@ -15,6 +15,7 @@ derive_more = { workspace = true } fuel-core-metrics = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } +parking_lot = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-rayon = { workspace = true } tracing = { workspace = true } diff --git a/crates/services/importer/src/config.rs b/crates/services/importer/src/config.rs index e8347ae2ef9..f959724abb2 100644 --- a/crates/services/importer/src/config.rs +++ b/crates/services/importer/src/config.rs @@ -1,14 +1,12 @@ #[derive(Debug, Clone)] pub struct Config { pub max_block_notify_buffer: usize, - pub metrics: bool, } impl Config { pub fn new() -> Self { Self { max_block_notify_buffer: 1 << 10, - metrics: false, } } } diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index c31f5483cde..f77efef82d4 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -39,15 +39,13 @@ use fuel_core_types::{ Uncommitted, }, }; +use parking_lot::Mutex; use std::{ ops::{ Deref, DerefMut, }, - sync::{ - Arc, - Mutex, - }, + sync::Arc, time::{ Instant, SystemTime, @@ -56,7 +54,8 @@ use std::{ }; use tokio::sync::{ broadcast, - oneshot, + OwnedSemaphorePermit, + Semaphore, TryAcquireError, }; @@ -97,6 +96,7 @@ pub enum Error { #[from] StorageError(StorageError), UnsupportedConsensusVariant(String), + ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError), } impl From for anyhow::Error { @@ -118,11 +118,12 @@ pub struct Importer { verifier: Arc, chain_id: ChainId, broadcast: broadcast::Sender, - /// The channel to notify about the end of the processing of the previous block by all listeners. - /// It is used to await until all receivers of the notification process the `SharedImportResult` - /// before starting committing a new block. - prev_block_process_result: Mutex>>, - guard: tokio::sync::Semaphore, + guard: Semaphore, + /// The semaphore tracks the number of unprocessed `SharedImportResult`. + /// If the number of unprocessed results is more than the threshold, + /// the block importer stops committing new blocks and waits for + /// the resolution of the previous one. + active_import_results: Arc, } impl Importer { @@ -133,7 +134,11 @@ impl Importer { executor: E, verifier: V, ) -> Self { - let (broadcast, _) = broadcast::channel(config.max_block_notify_buffer); + // We use semaphore as a back pressure mechanism instead of a `broadcast` + // channel because we want to prevent committing to the database results + // that will not be processed. + let max_block_notify_buffer = config.max_block_notify_buffer; + let (broadcast, _) = broadcast::channel(max_block_notify_buffer); Self { database: Mutex::new(database), @@ -141,8 +146,8 @@ impl Importer { verifier: Arc::new(verifier), chain_id, broadcast, - prev_block_process_result: Default::default(), - guard: tokio::sync::Semaphore::new(1), + active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)), + guard: Semaphore::new(1), } } @@ -198,35 +203,31 @@ where result: UncommittedResult, ) -> Result<(), Error> { let _guard = self.lock()?; - // It is safe to unwrap the channel because we have the `_guard`. - let previous_block_result = self - .prev_block_process_result - .lock() - .expect("poisoned") - .take(); // Await until all receivers of the notification process the result. - if let Some(channel) = previous_block_result { - const TIMEOUT: u64 = 20; - let result = - tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel) - .await; + const TIMEOUT: u64 = 20; + let await_result = tokio::time::timeout( + tokio::time::Duration::from_secs(TIMEOUT), + self.active_import_results.clone().acquire_owned(), + ) + .await; - if result.is_err() { - tracing::error!( - "The previous block processing \ + let Ok(permit) = await_result else { + tracing::error!( + "The previous block processing \ was not finished for {TIMEOUT} seconds." - ); - return Err(Error::PreviousBlockProcessingNotFinished) - } - } + ); + return Err(Error::PreviousBlockProcessingNotFinished) + }; + let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?; + let mut guard = self .database .try_lock() .expect("Semaphore prevents concurrent access to the database"); let database = guard.deref_mut(); - self._commit_result(result, database) + self._commit_result(result, permit, database) } /// The method commits the result of the block execution and notifies about a new imported block. @@ -242,6 +243,7 @@ where fn _commit_result( &self, result: UncommittedResult, + permit: OwnedSemaphorePermit, database: &mut D, ) -> Result<(), Error> { let (result, changes) = result.into(); @@ -327,16 +329,12 @@ where tracing::info!("Committed block {:#x}", result.sealed_block.entity.id()); - // The `tokio::sync::oneshot::Sender` is used to notify about the end - // of the processing of a new block by all listeners. - let (sender, receiver) = oneshot::channel(); let result = ImporterResult { - shared_result: Arc::new(Awaiter::new(result, sender)), + shared_result: Arc::new(Awaiter::new(result, permit)), #[cfg(feature = "test-helpers")] changes: Arc::new(changes_clone), }; let _ = self.broadcast.send(result); - *self.prev_block_process_result.lock().expect("poisoned") = Some(receiver); Ok(()) } @@ -467,28 +465,22 @@ where let result = result?; - // It is safe to unwrap the channel because we have the `_guard`. - let previous_block_result = self - .prev_block_process_result - .lock() - .expect("poisoned") - .take(); - // Await until all receivers of the notification process the result. - if let Some(channel) = previous_block_result { - const TIMEOUT: u64 = 20; - let result = - tokio::time::timeout(tokio::time::Duration::from_secs(TIMEOUT), channel) - .await; + const TIMEOUT: u64 = 20; + let await_result = tokio::time::timeout( + tokio::time::Duration::from_secs(TIMEOUT), + self.active_import_results.clone().acquire_owned(), + ) + .await; - if result.is_err() { - tracing::error!( - "The previous block processing \ + let Ok(permit) = await_result else { + tracing::error!( + "The previous block processing \ was not finished for {TIMEOUT} seconds." - ); - return Err(Error::PreviousBlockProcessingNotFinished) - } - } + ); + return Err(Error::PreviousBlockProcessingNotFinished) + }; + let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?; let start = Instant::now(); @@ -497,7 +489,7 @@ where .try_lock() .expect("Semaphore prevents concurrent access to the database"); let database = guard.deref_mut(); - let commit_result = self._commit_result(result, database); + let commit_result = self._commit_result(result, permit, database); let commit_time = start.elapsed().as_secs_f64(); let time = execute_time + commit_time; importer_metrics().execute_and_commit_duration.observe(time); @@ -509,15 +501,7 @@ where /// The wrapper around `ImportResult` to notify about the end of the processing of a new block. struct Awaiter { result: ImportResult, - release_channel: Option>, -} - -impl Drop for Awaiter { - fn drop(&mut self) { - if let Some(release_channel) = core::mem::take(&mut self.release_channel) { - let _ = release_channel.send(()); - } - } + _permit: OwnedSemaphorePermit, } impl Deref for Awaiter { @@ -529,10 +513,10 @@ impl Deref for Awaiter { } impl Awaiter { - fn new(result: ImportResult, channel: oneshot::Sender<()>) -> Self { + fn new(result: ImportResult, permit: OwnedSemaphorePermit) -> Self { Self { result, - release_channel: Some(channel), + _permit: permit, } } }