Skip to content

Commit

Permalink
pipeline: error on missing buffer in online stages (#5480)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Nov 18, 2023
1 parent 4555dc1 commit 6ded643
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
5 changes: 5 additions & 0 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub enum StageError {
/// Invalid checkpoint passed to the stage
#[error("invalid stage checkpoint: {0}")]
StageCheckpoint(u64),
/// Missing download buffer on stage execution.
/// Returned if stage execution was called without polling for readiness.
#[error("missing download buffer")]
MissingDownloadBuffer,
/// Download channel closed
#[error("download channel closed")]
ChannelClosed,
Expand Down Expand Up @@ -97,6 +101,7 @@ impl StageError {
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageCheckpoint(_) |
StageError::MissingDownloadBuffer |
StageError::MissingSyncGap |
StageError::ChannelClosed |
StageError::Fatal(_)
Expand Down
16 changes: 9 additions & 7 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ pub struct BodyStage<D: BodyDownloader> {
/// The body downloader.
downloader: D,
/// Block response buffer.
buffer: Vec<BlockResponse>,
buffer: Option<Vec<BlockResponse>>,
}

impl<D: BodyDownloader> BodyStage<D> {
/// Create new bodies stage from downloader.
pub fn new(downloader: D) -> Self {
Self { downloader, buffer: Vec::new() }
Self { downloader, buffer: None }
}
}

Expand All @@ -71,7 +71,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
cx: &mut Context<'_>,
input: ExecInput,
) -> Poll<Result<(), StageError>> {
if input.target_reached() || !self.buffer.is_empty() {
if input.target_reached() || self.buffer.is_some() {
return Poll::Ready(Ok(()))
}

Expand All @@ -85,7 +85,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// is a fatal error to prevent the pipeline from running forever.
let response = match maybe_next_result {
Some(Ok(downloaded)) => {
self.buffer.extend(downloaded);
self.buffer = Some(downloaded);
Ok(())
}
Some(Err(err)) => Err(err.into()),
Expand Down Expand Up @@ -118,9 +118,11 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
let mut next_tx_num = tx_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();

debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, start_tx_id = next_tx_num, "Commencing sync");
trace!(target: "sync::stages::bodies", bodies_len = self.buffer.len(), "Writing blocks");

let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
let mut highest_block = from_block;
for response in self.buffer.drain(..) {
for response in buffer {
// Write block
let block_number = response.block_number();

Expand Down Expand Up @@ -186,7 +188,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.buffer.clear();
self.buffer.take();

let tx = provider.tx_ref();
// Cursors to unwind bodies, ommers
Expand Down
18 changes: 10 additions & 8 deletions crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct HeaderStage<Provider, Downloader: HeaderDownloader> {
/// Current sync gap.
sync_gap: Option<HeaderSyncGap>,
/// Header buffer.
buffer: Vec<SealedHeader>,
buffer: Option<Vec<SealedHeader>>,
}

// === impl HeaderStage ===
Expand All @@ -55,7 +55,7 @@ where
{
/// Create a new header stage
pub fn new(database: Provider, downloader: Downloader, mode: HeaderSyncMode) -> Self {
Self { provider: database, downloader, mode, sync_gap: None, buffer: Vec::new() }
Self { provider: database, downloader, mode, sync_gap: None, buffer: None }
}

fn is_stage_done<DB: Database>(
Expand Down Expand Up @@ -126,7 +126,8 @@ where
let current_checkpoint = input.checkpoint();

// Return if buffer already has some items.
if !self.buffer.is_empty() {
if self.buffer.is_some() {
// TODO: review
trace!(
target: "sync::stages::headers",
checkpoint = %current_checkpoint.block_number,
Expand Down Expand Up @@ -159,7 +160,7 @@ where
let result = match ready!(self.downloader.poll_next_unpin(cx)) {
Some(Ok(headers)) => {
info!(target: "sync::stages::headers", len = headers.len(), "Received headers");
self.buffer.extend(headers);
self.buffer = Some(headers);
Ok(())
}
Some(Err(HeadersDownloaderError::DetachedHead { local_head, header, error })) => {
Expand All @@ -179,15 +180,16 @@ where
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let current_checkpoint = input.checkpoint();
if self.buffer.is_empty() {

let gap = self.sync_gap.clone().ok_or(StageError::MissingSyncGap)?;
if gap.is_closed() {
return Ok(ExecOutput::done(current_checkpoint))
}

let gap = self.sync_gap.clone().ok_or(StageError::MissingSyncGap)?;
let local_head = gap.local_head.number;
let tip = gap.target.tip();

let downloaded_headers = std::mem::take(&mut self.buffer);
let downloaded_headers = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
let tip_block_number = match tip {
// If tip is hash and it equals to the first downloaded header's hash, we can use
// the block number of this header as tip.
Expand Down Expand Up @@ -280,7 +282,7 @@ where
provider: &DatabaseProviderRW<'_, &DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.buffer.clear();
self.buffer.take();
self.sync_gap.take();

provider.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
Expand Down

0 comments on commit 6ded643

Please sign in to comment.