Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No longer spawn subtasks for block requests in consensus_service #1491

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 38 additions & 59 deletions full-node/src/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ use crate::{database_thread, jaeger_service, network_service, LogCallback, LogLe
use core::num::NonZeroU32;
use futures_channel::{mpsc, oneshot};
use futures_lite::FutureExt as _;
use futures_util::{future, stream, SinkExt as _, StreamExt as _};
use futures_util::{
future,
stream::{self, FuturesUnordered},
SinkExt as _, StreamExt as _,
};
use hashbrown::HashSet;
use smol::lock::Mutex;
use smoldot::{
Expand All @@ -48,8 +52,10 @@ use smoldot::{
use std::{
array,
borrow::Cow,
iter, mem,
future::Future,
iter,
num::{NonZeroU64, NonZeroUsize},
pin::Pin,
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
Expand Down Expand Up @@ -174,7 +180,7 @@ pub enum InitError {

impl ConsensusService {
/// Initializes the [`ConsensusService`] with the given configuration.
pub async fn new(config: Config) -> Result<Arc<Self>, InitError> {
pub async fn new(mut config: Config) -> Result<Arc<Self>, InitError> {
// Perform the initial access to the database to load a bunch of information.
let (
finalized_block_number,
Expand Down Expand Up @@ -324,7 +330,6 @@ impl ConsensusService {

let block_author_sync_source = sync.add_source(None, best_block_number, best_block_hash);

let (block_requests_finished_tx, block_requests_finished_rx) = mpsc::channel(0);
let (to_background_tx, to_background_rx) = mpsc::channel(4);

let background_sync = SyncBackground {
Expand All @@ -342,14 +347,12 @@ impl ConsensusService {
from_network_service: config.network_events_receiver,
database: config.database,
peers_source_id_map: Default::default(),
tasks_executor: config.tasks_executor,
sub_tasks: FuturesUnordered::new(),
log_callback: config.log_callback,
block_requests_finished_tx,
block_requests_finished_rx,
jaeger_service: config.jaeger_service,
};

background_sync.start();
(config.tasks_executor)(Box::pin(background_sync.run()));

Ok(Arc::new(ConsensusService {
block_number_bytes: config.block_number_bytes,
Expand Down Expand Up @@ -584,8 +587,7 @@ struct SyncBackground {
/// This "trick" is necessary in order to not cancel requests that have already been started
/// against a peer when it disconnects and that might already have a response.
///
/// Each on-going request has a corresponding background task that sends its result to
/// [`SyncBackground::block_requests_finished_rx`].
/// Each on-going request has a corresponding background task in [`SyncBackground::sub_tasks`].
sync: all::AllSync<(), Option<NetworkSourceInfo>, NonFinalizedBlock>,

/// Source within the [`SyncBackground::sync`] to use to import locally-authored blocks.
Expand Down Expand Up @@ -646,27 +648,12 @@ struct SyncBackground {
/// source are removed.
peers_source_id_map: hashbrown::HashMap<libp2p::PeerId, all::SourceId, fnv::FnvBuildHasher>,

/// See [`Config::tasks_executor`].
tasks_executor: Box<dyn FnMut(future::BoxFuture<'static, ()>) + Send>,
/// Futures that get executed by the background task.
sub_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = SubtaskFinished> + Send>>>,

/// See [`Config::log_callback`].
log_callback: Arc<dyn LogCallback + Send + Sync>,

/// Block requests that have been emitted on the networking service and that are still in
/// progress. Each entry in this field also has an entry in [`SyncBackground::sync`].
block_requests_finished_rx: mpsc::Receiver<(
all::RequestId,
all::SourceId,
Result<Vec<BlockData>, network_service::BlocksRequestError>,
)>,

/// Sending side of [`SyncBackground::block_requests_finished_rx`].
block_requests_finished_tx: mpsc::Sender<(
all::RequestId,
all::SourceId,
Result<Vec<BlockData>, network_service::BlocksRequestError>,
)>,

/// See [`Config::database`].
database: Arc<database_thread::DatabaseThread>,

Expand Down Expand Up @@ -696,22 +683,15 @@ struct NetworkSourceInfo {
is_disconnected: bool,
}

impl SyncBackground {
fn start(mut self) {
// This function is a small hack because I didn't find a better way to store the executor
// within `Background` while at the same time spawning the `Background` using said
// executor.
let mut actual_executor =
mem::replace(&mut self.tasks_executor, Box::new(|_| unreachable!()));
let (tx, rx) = oneshot::channel();
actual_executor(Box::pin(async move {
let actual_executor = rx.await.unwrap();
self.tasks_executor = actual_executor;
self.run().await;
}));
tx.send(actual_executor).unwrap_or_else(|_| panic!());
}
enum SubtaskFinished {
BlocksRequestFinished {
request_id: all::RequestId,
source_id: all::SourceId,
result: Result<Vec<BlockData>, network_service::BlocksRequestError>,
},
}

impl SyncBackground {
async fn run(mut self) {
let mut process_sync = true;

Expand All @@ -723,11 +703,7 @@ impl SyncBackground {
FrontendEvent(ToBackground),
FrontendClosed,
NetworkEvent(network_service::Event),
RequestFinished(
all::RequestId,
all::SourceId,
Result<Vec<BlockData>, network_service::BlocksRequestError>,
),
SubtaskFinished(SubtaskFinished),
SyncProcess,
}

Expand Down Expand Up @@ -838,9 +814,10 @@ impl SyncBackground {
WakeUpReason::NetworkEvent(self.from_network_service.next().await.unwrap())
})
.or(async {
let (request_id, source_id, result) =
self.block_requests_finished_rx.select_next_some().await;
WakeUpReason::RequestFinished(request_id, source_id, result)
let Some(subtask_finished) = self.sub_tasks.next().await else {
future::pending().await
};
WakeUpReason::SubtaskFinished(subtask_finished)
})
.or(async {
if !process_sync {
Expand Down Expand Up @@ -1059,7 +1036,11 @@ impl SyncBackground {
// Different chain index.
}

WakeUpReason::RequestFinished(request_id, source_id, result) => {
WakeUpReason::SubtaskFinished(SubtaskFinished::BlocksRequestFinished {
request_id,
source_id,
result,
}) => {
// TODO: clarify this piece of code
let result = result.map_err(|_| ());
let (_, response_outcome) = self.sync.blocks_request_response(
Expand Down Expand Up @@ -1572,14 +1553,12 @@ impl SyncBackground {

let request_id = self.sync.add_request(source_id, request_info.into(), ());

(self.tasks_executor)(Box::pin({
let mut block_requests_finished_tx =
self.block_requests_finished_tx.clone();
async move {
let result = request.await;
let _ = block_requests_finished_tx
.send((request_id, source_id, result))
.await;
self.sub_tasks.push(Box::pin(async move {
let result = request.await;
SubtaskFinished::BlocksRequestFinished {
request_id,
source_id,
result,
}
}));
}
Expand Down