From 53bb37d3228c7844226f85de0836192f9e7f95e7 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 27 Aug 2024 21:37:29 +0200 Subject: [PATCH 1/2] Fix receipts import, fix chunked read of file with optional block data --- crates/net/downloaders/src/file_client.rs | 82 ++++++++++--- crates/net/downloaders/src/lib.rs | 2 + .../downloaders/src/receipt_file_client.rs | 115 +++++++++--------- 3 files changed, 126 insertions(+), 73 deletions(-) diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index eaf39267755f..3923597377ce 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -1,4 +1,5 @@ -use super::file_codec::BlockFileCodec; +use std::{collections::HashMap, io, path::Path}; + use futures::Future; use itertools::Either; use reth_network_p2p::{ @@ -12,13 +13,16 @@ use reth_network_peers::PeerId; use reth_primitives::{ BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, SealedHeader, B256, }; -use std::{collections::HashMap, io, path::Path}; use thiserror::Error; use tokio::{fs::File, io::AsyncReadExt}; use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; use tracing::{debug, trace, warn}; +use crate::receipt_file_client::FromReceiptReader; + +use super::file_codec::BlockFileCodec; + /// Default byte length of chunk to read from chain file. /// /// Default is 1 GB. @@ -85,7 +89,7 @@ impl FileClient { let mut reader = vec![]; file.read_to_end(&mut reader).await?; - Ok(Self::from_reader(&reader[..], file_len).await?.0) + Ok(Self::from_reader(&reader[..], file_len).await?.file_client) } /// Get the tip hash of the chain. @@ -184,7 +188,7 @@ impl FromReader for FileClient { fn from_reader( reader: B, num_bytes: u64, - ) -> impl Future), Self::Error>> + ) -> impl Future, Self::Error>> where B: AsyncReadExt + Unpin, { @@ -247,7 +251,11 @@ impl FromReader for FileClient { trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client"); - Ok((Self { headers, hash_to_number, bodies }, remaining_bytes)) + Ok(DecodedFileChunk { + file_client: Self { headers, hash_to_number, bodies }, + remaining_bytes, + highest_block: None, + }) } } } @@ -349,6 +357,9 @@ pub struct ChunkedFileReader { chunk: Vec, /// Max bytes per chunk. chunk_byte_len: u64, + /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1 + /// with block number + highest_block: Option, } impl ChunkedFileReader { @@ -375,7 +386,7 @@ impl ChunkedFileReader { let metadata = file.metadata().await?; let file_byte_len = metadata.len(); - Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len }) + Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None }) } /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk @@ -392,11 +403,9 @@ impl ChunkedFileReader { } } - /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. - pub async fn next_chunk(&mut self) -> Result, T::Error> - where - T: FromReader, - { + /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next + /// chunk to read. + async fn read_next_chunk(&mut self) -> Result, io::Error> { if self.file_byte_len == 0 && self.chunk.is_empty() { dbg!(self.chunk.is_empty()); // eof @@ -431,12 +440,42 @@ impl ChunkedFileReader { "new bytes were read from file" ); + Ok(Some(next_chunk_byte_len as u64)) + } + + /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. + pub async fn next_chunk(&mut self) -> Result, T::Error> + where + T: FromReader, + { + let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; + // make new file client from chunk - let (file_client, bytes) = - T::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?; + let DecodedFileChunk { file_client, remaining_bytes, .. } = + T::from_reader(&self.chunk[..], next_chunk_byte_len).await?; // save left over bytes - self.chunk = bytes; + self.chunk = remaining_bytes; + + Ok(Some(file_client)) + } + + /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. + pub async fn next_receipts_chunk(&mut self) -> Result, T::Error> + where + T: FromReceiptReader, + { + let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; + + // make new file client from chunk + let DecodedFileChunk { file_client, remaining_bytes, highest_block } = + T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block) + .await?; + + // save left over bytes + self.chunk = remaining_bytes; + // update highest block + self.highest_block = highest_block; Ok(Some(file_client)) } @@ -446,16 +485,29 @@ impl ChunkedFileReader { pub trait FromReader { /// Error returned by file client type. type Error: From; + /// Returns a file client fn from_reader( reader: B, num_bytes: u64, - ) -> impl Future), Self::Error>> + ) -> impl Future, Self::Error>> where Self: Sized, B: AsyncReadExt + Unpin; } +/// Output from decoding a file chunk with [`FromReader::from_reader`]. +#[derive(Debug)] +pub struct DecodedFileChunk { + /// File client, i.e. the decoded part of chunk. + pub file_client: T, + /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt. + pub remaining_bytes: Vec, + /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with + /// block number, like receipts. + pub highest_block: Option, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/downloaders/src/lib.rs b/crates/net/downloaders/src/lib.rs index 7944a31a5bae..8d50b6fbc030 100644 --- a/crates/net/downloaders/src/lib.rs +++ b/crates/net/downloaders/src/lib.rs @@ -40,3 +40,5 @@ pub mod file_codec; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; + +pub use file_client::{DecodedFileChunk, FileClientError}; diff --git a/crates/net/downloaders/src/receipt_file_client.rs b/crates/net/downloaders/src/receipt_file_client.rs index 3889350df7ac..fd0bb0901689 100644 --- a/crates/net/downloaders/src/receipt_file_client.rs +++ b/crates/net/downloaders/src/receipt_file_client.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::{fmt, io, marker::PhantomData}; use futures::Future; use reth_primitives::{Receipt, Receipts}; @@ -7,7 +7,7 @@ use tokio_stream::StreamExt; use tokio_util::codec::{Decoder, FramedRead}; use tracing::trace; -use crate::file_client::{FileClientError, FromReader}; +use crate::{DecodedFileChunk, FileClientError}; /// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential /// order w.r.t. block number. @@ -26,7 +26,7 @@ pub struct ReceiptFileClient { /// Constructs a file client from a reader and decoder. pub trait FromReceiptReader { /// Error returned by file client type. - type Error: From; + type Error: From; /// Returns a decoder instance fn decoder() -> D; @@ -34,59 +34,40 @@ pub trait FromReceiptReader { /// Returns a file client fn from_receipt_reader( reader: B, - decoder: D, num_bytes: u64, - ) -> impl Future), Self::Error>> + prev_chunk_highest_block: Option, + ) -> impl Future, Self::Error>> where Self: Sized, B: AsyncReadExt + Unpin; } -impl FromReader for ReceiptFileClient -where - D: Decoder, Error = FileClientError> - + std::fmt::Debug - + Default, -{ - type Error = D::Error; - - fn from_reader( - reader: B, - num_bytes: u64, - ) -> impl Future), Self::Error>> - where - B: AsyncReadExt + Unpin, - { - Self::from_receipt_reader(reader, Self::decoder(), num_bytes) - } -} - impl FromReceiptReader for ReceiptFileClient where D: Decoder, Error = FileClientError> - + std::fmt::Debug + + fmt::Debug + Default, { type Error = D::Error; fn decoder() -> D { - Default::default() + D::default() } /// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If /// first block has no transactions, it's assumed to be the genesis block. fn from_receipt_reader( reader: B, - decoder: D, num_bytes: u64, - ) -> impl Future), Self::Error>> + prev_chunk_highest_block: Option, + ) -> impl Future, Self::Error>> where B: AsyncReadExt + Unpin, { let mut receipts = Receipts::default(); // use with_capacity to make sure the internal buffer contains the entire chunk - let mut stream = FramedRead::with_capacity(reader, decoder, num_bytes as usize); + let mut stream = FramedRead::with_capacity(reader, Self::decoder(), num_bytes as usize); trace!(target: "downloaders::file", target_num_bytes=num_bytes, @@ -152,10 +133,16 @@ where block_number = num + receipts.len() as u64; } None => { - // this is the first block and it's empty, assume it's the genesis - // block - first_block = Some(0); - block_number = 0; + // this is the first block and it's empty + if let Some(highest_block) = prev_chunk_highest_block { + // this is a chunked read and this is not the first chunk + block_number = highest_block + 1; + } else { + // this is not a chunked read or this is the first chunk. assume + // it's the genesis block + block_number = 0; + } + first_block = Some(block_number); } } @@ -196,15 +183,16 @@ where "Initialized receipt file client" ); - Ok(( - Self { + Ok(DecodedFileChunk { + file_client: Self { receipts, first_block: first_block.unwrap_or_default(), total_receipts, _marker: Default::default(), }, remaining_bytes, - )) + highest_block: Some(block_number), + }) } } } @@ -220,10 +208,6 @@ pub struct ReceiptWithBlockNumber { #[cfg(test)] mod test { - use crate::{ - file_client::{FileClientError, FromReader}, - receipt_file_client::{ReceiptFileClient, ReceiptWithBlockNumber}, - }; use alloy_rlp::{Decodable, RlpDecodable}; use reth_primitives::{ hex, Address, Buf, Bytes, BytesMut, Log, LogData, Receipt, TxType, B256, @@ -231,6 +215,9 @@ mod test { use reth_tracing::init_test_tracing; use tokio_util::codec::Decoder; + use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber}; + use crate::{DecodedFileChunk, FileClientError}; + #[derive(Debug, PartialEq, Eq, RlpDecodable)] struct MockReceipt { tx_type: u8, @@ -578,12 +565,16 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let ( - ReceiptFileClient { receipts, first_block, total_receipts, _marker }, - _remaining_bytes, - ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) - .await - .unwrap(); + let DecodedFileChunk { + file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. }, + .. + } = ReceiptFileClient::::from_receipt_reader( + reader, + encoded_byte_len, + None, + ) + .await + .unwrap(); // 2 non-empty receipt objects assert_eq!(2, total_receipts); @@ -610,12 +601,16 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let ( - ReceiptFileClient { receipts, first_block, total_receipts, _marker }, - _remaining_bytes, - ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) - .await - .unwrap(); + let DecodedFileChunk { + file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. }, + .. + } = ReceiptFileClient::::from_receipt_reader( + reader, + encoded_byte_len, + None, + ) + .await + .unwrap(); // 2 non-empty receipt objects assert_eq!(2, total_receipts); @@ -643,12 +638,16 @@ mod test { let encoded_byte_len = encoded_receipts.len() as u64; let reader = &mut &encoded_receipts[..]; - let ( - ReceiptFileClient { receipts, first_block, total_receipts, _marker }, - _remaining_bytes, - ) = ReceiptFileClient::::from_reader(reader, encoded_byte_len) - .await - .unwrap(); + let DecodedFileChunk { + file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. }, + .. + } = ReceiptFileClient::::from_receipt_reader( + reader, + encoded_byte_len, + None, + ) + .await + .unwrap(); // 4 non-empty receipt objects assert_eq!(4, total_receipts); From a054405fa4d96838a829d2e864a288fbba11cd2f Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 27 Aug 2024 21:59:16 +0200 Subject: [PATCH 2/2] Fix lint --- crates/optimism/cli/src/commands/import_receipts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index cf3f6cf0de27..7c0f483b04f9 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -184,7 +184,7 @@ where let static_file_provider = provider_factory.static_file_provider(); while let Some(file_client) = - reader.next_chunk::>().await? + reader.next_receipts_chunk::, HackReceiptFileCodec>().await? { // create a new file client from chunk read from file let ReceiptFileClient {