Skip to content

Commit

Permalink
fix: bad unwrap on resolve (#8675)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jun 7, 2024
1 parent 4fc289b commit 977def8
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 44 deletions.
126 changes: 82 additions & 44 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where

let cached_reads = self.maybe_pre_cached(config.parent_block.hash());

Ok(BasicPayloadJob {
let mut job = BasicPayloadJob {
config,
client: self.client.clone(),
pool: self.pool.clone(),
Expand All @@ -193,7 +193,12 @@ where
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
})
};

// start the first job right away
job.spawn_build_job();

Ok(job)
}

fn on_new_state(&mut self, new_state: CanonStateNotification) {
Expand Down Expand Up @@ -347,6 +352,48 @@ where
builder: Builder,
}

impl<Client, Pool, Tasks, Builder> BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Pool: TransactionPool + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder<Pool, Client> + Unpin + 'static,
<Builder as PayloadBuilder<Pool, Client>>::Attributes: Unpin + Clone,
<Builder as PayloadBuilder<Pool, Client>>::BuiltPayload: Unpin + Clone,
{
/// Spawns a new payload build task.
fn spawn_build_job(&mut self) {
trace!(target: "payload_builder", "spawn new payload build task");
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
let pool = self.pool.clone();
let cancel = Cancelled::default();
let _cancel = cancel.clone();
let guard = self.payload_task_guard.clone();
let payload_config = self.config.clone();
let best_payload = self.best_payload.clone();
self.metrics.inc_initiated_payload_builds();
let cached_reads = self.cached_reads.take().unwrap_or_default();
let builder = self.builder.clone();
self.executor.spawn_blocking(Box::pin(async move {
// acquire the permit for executing the task
let _permit = guard.acquire().await;
let args = BuildArguments {
client,
pool,
cached_reads,
config: payload_config,
cancel,
best_payload,
};
let result = builder.try_build(args);
let _ = tx.send(result);
}));

self.pending_block = Some(PendingPayload { _cancel, payload: rx });
}
}

impl<Client, Pool, Tasks, Builder> Future for BasicPayloadJob<Client, Pool, Tasks, Builder>
where
Client: StateProviderFactory + Clone + Unpin + 'static,
Expand All @@ -371,34 +418,7 @@ where
while this.interval.poll_tick(cx).is_ready() {
// start a new job if there is no pending block and we haven't reached the deadline
if this.pending_block.is_none() {
trace!(target: "payload_builder", "spawn new payload build task");
let (tx, rx) = oneshot::channel();
let client = this.client.clone();
let pool = this.pool.clone();
let cancel = Cancelled::default();
let _cancel = cancel.clone();
let guard = this.payload_task_guard.clone();
let payload_config = this.config.clone();
let best_payload = this.best_payload.clone();
this.metrics.inc_initiated_payload_builds();
let cached_reads = this.cached_reads.take().unwrap_or_default();
let builder = this.builder.clone();
this.executor.spawn_blocking(Box::pin(async move {
// acquire the permit for executing the task
let _permit = guard.acquire().await;
let args = BuildArguments {
client,
pool,
cached_reads,
config: payload_config,
cancel,
best_payload,
};
let result = builder.try_build(args);
let _ = tx.send(result);
}));

this.pending_block = Some(PendingPayload { _cancel, payload: rx });
this.spawn_build_job();
}
}

Expand Down Expand Up @@ -470,6 +490,12 @@ where

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let best_payload = self.best_payload.take();

if best_payload.is_none() && self.pending_block.is_none() {
// ensure we have a job scheduled if we don't have a best payload yet and none is active
self.spawn_build_job();
}

let maybe_better = self.pending_block.take();
let mut empty_payload = None;

Expand Down Expand Up @@ -539,10 +565,16 @@ pub struct ResolveBestPayload<Payload> {
pub best_payload: Option<Payload>,
/// Regular payload job that's currently running that might produce a better payload.
pub maybe_better: Option<PendingPayload<Payload>>,
/// The empty payload building job in progress.
/// The empty payload building job in progress, if any.
pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
}

impl<Payload> ResolveBestPayload<Payload> {
const fn is_empty(&self) -> bool {
self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
}
}

impl<Payload> Future for ResolveBestPayload<Payload>
where
Payload: Unpin,
Expand All @@ -568,22 +600,28 @@ where
return Poll::Ready(Ok(best))
}

let mut empty_payload = this.empty_payload.take().expect("polled after completion");
match empty_payload.poll_unpin(cx) {
Poll::Ready(Ok(res)) => {
if let Err(err) = &res {
warn!(target: "payload_builder", %err, "failed to resolve empty payload");
} else {
debug!(target: "payload_builder", "resolving empty payload");
if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() {
if let Poll::Ready(res) = fut.poll(cx) {
this.empty_payload = None;
return match res {
Ok(res) => {
if let Err(err) = &res {
warn!(target: "payload_builder", %err, "failed to resolve empty payload");
} else {
debug!(target: "payload_builder", "resolving empty payload");
}
Poll::Ready(res)
}
Err(err) => Poll::Ready(Err(err.into())),
}
Poll::Ready(res)
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => {
this.empty_payload = Some(empty_payload);
Poll::Pending
}
}

if this.is_empty() {
return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
}

Poll::Pending
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/payload/builder/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub enum PayloadBuilderError {
/// An oneshot channels has been closed.
#[error("sender has been dropped")]
ChannelClosed,
/// If there's no payload to resolve.
#[error("missing payload")]
MissingPayload,
/// Error occurring in the blob store.
#[error(transparent)]
BlobStore(#[from] BlobStoreError),
Expand Down

0 comments on commit 977def8

Please sign in to comment.