diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 3c60d27de296..9954b39a8011 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -180,7 +180,7 @@ where let cached_reads = self.maybe_pre_cached(config.parent_block.hash()); - Ok(BasicPayloadJob { + let mut job = BasicPayloadJob { config, client: self.client.clone(), pool: self.pool.clone(), @@ -193,7 +193,12 @@ where payload_task_guard: self.payload_task_guard.clone(), metrics: Default::default(), builder: self.builder.clone(), - }) + }; + + // start the first job right away + job.spawn_build_job(); + + Ok(job) } fn on_new_state(&mut self, new_state: CanonStateNotification) { @@ -347,6 +352,48 @@ where builder: Builder, } +impl BasicPayloadJob +where + Client: StateProviderFactory + Clone + Unpin + 'static, + Pool: TransactionPool + Unpin + 'static, + Tasks: TaskSpawner + Clone + 'static, + Builder: PayloadBuilder + Unpin + 'static, + >::Attributes: Unpin + Clone, + >::BuiltPayload: Unpin + Clone, +{ + /// Spawns a new payload build task. + fn spawn_build_job(&mut self) { + trace!(target: "payload_builder", "spawn new payload build task"); + let (tx, rx) = oneshot::channel(); + let client = self.client.clone(); + let pool = self.pool.clone(); + let cancel = Cancelled::default(); + let _cancel = cancel.clone(); + let guard = self.payload_task_guard.clone(); + let payload_config = self.config.clone(); + let best_payload = self.best_payload.clone(); + self.metrics.inc_initiated_payload_builds(); + let cached_reads = self.cached_reads.take().unwrap_or_default(); + let builder = self.builder.clone(); + self.executor.spawn_blocking(Box::pin(async move { + // acquire the permit for executing the task + let _permit = guard.acquire().await; + let args = BuildArguments { + client, + pool, + cached_reads, + config: payload_config, + cancel, + best_payload, + }; + let result = builder.try_build(args); + let _ = tx.send(result); + })); + + self.pending_block = Some(PendingPayload { _cancel, payload: rx }); + } +} + impl Future for BasicPayloadJob where Client: StateProviderFactory + Clone + Unpin + 'static, @@ -371,34 +418,7 @@ where while this.interval.poll_tick(cx).is_ready() { // start a new job if there is no pending block and we haven't reached the deadline if this.pending_block.is_none() { - trace!(target: "payload_builder", "spawn new payload build task"); - let (tx, rx) = oneshot::channel(); - let client = this.client.clone(); - let pool = this.pool.clone(); - let cancel = Cancelled::default(); - let _cancel = cancel.clone(); - let guard = this.payload_task_guard.clone(); - let payload_config = this.config.clone(); - let best_payload = this.best_payload.clone(); - this.metrics.inc_initiated_payload_builds(); - let cached_reads = this.cached_reads.take().unwrap_or_default(); - let builder = this.builder.clone(); - this.executor.spawn_blocking(Box::pin(async move { - // acquire the permit for executing the task - let _permit = guard.acquire().await; - let args = BuildArguments { - client, - pool, - cached_reads, - config: payload_config, - cancel, - best_payload, - }; - let result = builder.try_build(args); - let _ = tx.send(result); - })); - - this.pending_block = Some(PendingPayload { _cancel, payload: rx }); + this.spawn_build_job(); } } @@ -470,6 +490,12 @@ where fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) { let best_payload = self.best_payload.take(); + + if best_payload.is_none() && self.pending_block.is_none() { + // ensure we have a job scheduled if we don't have a best payload yet and none is active + self.spawn_build_job(); + } + let maybe_better = self.pending_block.take(); let mut empty_payload = None; @@ -539,10 +565,16 @@ pub struct ResolveBestPayload { pub best_payload: Option, /// Regular payload job that's currently running that might produce a better payload. pub maybe_better: Option>, - /// The empty payload building job in progress. + /// The empty payload building job in progress, if any. pub empty_payload: Option>>, } +impl ResolveBestPayload { + const fn is_empty(&self) -> bool { + self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none() + } +} + impl Future for ResolveBestPayload where Payload: Unpin, @@ -568,22 +600,28 @@ where return Poll::Ready(Ok(best)) } - let mut empty_payload = this.empty_payload.take().expect("polled after completion"); - match empty_payload.poll_unpin(cx) { - Poll::Ready(Ok(res)) => { - if let Err(err) = &res { - warn!(target: "payload_builder", %err, "failed to resolve empty payload"); - } else { - debug!(target: "payload_builder", "resolving empty payload"); + if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() { + if let Poll::Ready(res) = fut.poll(cx) { + this.empty_payload = None; + return match res { + Ok(res) => { + if let Err(err) = &res { + warn!(target: "payload_builder", %err, "failed to resolve empty payload"); + } else { + debug!(target: "payload_builder", "resolving empty payload"); + } + Poll::Ready(res) + } + Err(err) => Poll::Ready(Err(err.into())), } - Poll::Ready(res) - } - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), - Poll::Pending => { - this.empty_payload = Some(empty_payload); - Poll::Pending } } + + if this.is_empty() { + return Poll::Ready(Err(PayloadBuilderError::MissingPayload)) + } + + Poll::Pending } } diff --git a/crates/payload/builder/src/error.rs b/crates/payload/builder/src/error.rs index eee8899cf1d9..8c430000f7c3 100644 --- a/crates/payload/builder/src/error.rs +++ b/crates/payload/builder/src/error.rs @@ -14,6 +14,9 @@ pub enum PayloadBuilderError { /// An oneshot channels has been closed. #[error("sender has been dropped")] ChannelClosed, + /// If there's no payload to resolve. + #[error("missing payload")] + MissingPayload, /// Error occurring in the blob store. #[error(transparent)] BlobStore(#[from] BlobStoreError),