Skip to content

Commit

Permalink
Update records.rs to fix merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aovestdipaperino authored Feb 4, 2025
1 parent 382aab2 commit 0695ef3
Showing 1 changed file with 102 additions and 46 deletions.
148 changes: 102 additions & 46 deletions src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! for topic in res.responses {
//! for partition in topic.partitions {
//! let mut records = partition.records.unwrap();
//! let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap();
//! let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap();
//! }
//! }
//!
Expand Down Expand Up @@ -157,20 +157,37 @@ pub struct Record {
const MAGIC_BYTE_OFFSET: usize = 16;

impl RecordBatchEncoder {
/// Encode records into given buffer, using provided encoding options that select the encoding
/// strategy based on version.
pub fn encode<'a, B, I>(buf: &mut B, records: I, options: &RecordEncodeOptions) -> Result<()>
where
B: ByteBufMut,
I: IntoIterator<Item = &'a Record>,
I::IntoIter: Clone,
{
Self::encode_with_custom_compression(
buf,
records,
options,
None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
)
}

/// Encode records into given buffer, using provided encoding options that select the encoding
/// strategy based on version.
/// # Arguments
/// * `compressor` - A function that compresses the given batch of records.
///
/// If `None`, the right compression algorithm will automatically be selected and applied.
pub fn encode<'a, B, I, CF>(
pub fn encode_with_custom_compression<'a, B, I, CF>(
buf: &mut B,
records: I,
options: &RecordEncodeOptions,
compressor: Option<CF>,
) -> Result<()>
where
B: ByteBufMut,
I: IntoIterator<Item=&'a Record>,
I: IntoIterator<Item = &'a Record>,
I::IntoIter: Clone,
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
{
Expand All @@ -188,7 +205,7 @@ impl RecordBatchEncoder {
) -> Result<()>
where
B: ByteBufMut,
I: Iterator<Item=&'a Record> + Clone,
I: Iterator<Item = &'a Record> + Clone,
{
for record in records {
record.encode_legacy(buf, options)?;
Expand All @@ -203,7 +220,7 @@ impl RecordBatchEncoder {
) -> Result<()>
where
B: ByteBufMut,
I: Iterator<Item=&'a Record> + Clone,
I: Iterator<Item = &'a Record> + Clone,
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
{
if options.compression == Compression::None {
Expand Down Expand Up @@ -239,19 +256,27 @@ impl RecordBatchEncoder {
compressor(&mut encoded_buf, buf, options.compression)?;
} else {
match options.compression {
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
_ => unimplemented!(),
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
}
}

Expand Down Expand Up @@ -280,7 +305,7 @@ impl RecordBatchEncoder {
) -> Result<()>
where
B: ByteBufMut,
I: Iterator<Item=&'a Record>,
I: Iterator<Item = &'a Record>,
{
for record in records {
record.encode_new(buf, min_offset, min_timestamp, options)?;
Expand All @@ -296,7 +321,7 @@ impl RecordBatchEncoder {
) -> Result<bool>
where
B: ByteBufMut,
I: Iterator<Item=&'a Record> + Clone,
I: Iterator<Item = &'a Record> + Clone,
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
{
let mut record_peeker = records.clone();
Expand All @@ -316,7 +341,7 @@ impl RecordBatchEncoder {
&& record.producer_id == first_record.producer_id
&& record.producer_epoch == first_record.producer_epoch
&& (record.offset as i32).wrapping_sub(record.sequence)
== (first_record.offset as i32).wrapping_sub(first_record.sequence)
== (first_record.offset as i32).wrapping_sub(first_record.sequence)
})
.count()
+ 1;
Expand Down Expand Up @@ -416,18 +441,28 @@ impl RecordBatchEncoder {
Compression::None => cmpr::None::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[allow(unreachable_patterns)]
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
}
}
let batch_end = buf.offset();
Expand Down Expand Up @@ -458,15 +493,14 @@ impl RecordBatchEncoder {
) -> Result<()>
where
B: ByteBufMut,
I: Iterator<Item=&'a Record> + Clone,
I: Iterator<Item = &'a Record> + Clone,
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
{
while Self::encode_new_batch(buf, &mut records, options, compressor.as_ref())? {}
Ok(())
}
}


struct RecordIterator
{
buf: Bytes,
Expand Down Expand Up @@ -501,22 +535,33 @@ impl RecordIterator {
}
}


impl RecordBatchDecoder {
/// Decode the provided buffer into a vec of records.
/// Decode the provided buffer into a RecordIterator.
/// # Arguments
/// * `buf` - The buffer to decode.
/// * `version` - The version of the record batch.
pub fn records(buf: &mut Bytes, version: i8) -> Box<dyn Iterator<Item=Record>>
{
Box::new(RecordIterator::new(buf, version))
}


/// Decode the provided buffer into a vec of records.
pub fn decode<B: ByteBuf>(buf: &mut B) -> Result<Vec<Record>> {
Self::decode_with_custom_compression(
buf,
None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>,
)
}

/// Decode the provided buffer into a vec of records.
/// # Arguments
/// * `decompressor` - A function that decompresses the given batch of records.
///
/// If `None`, the right decompression algorithm will automatically be selected and applied.
pub fn decode<B: ByteBuf, F>(buf: &mut B, decompressor: Option<F>) -> Result<Vec<Record>>
pub fn decode_with_custom_compression<B: ByteBuf, F>(
buf: &mut B,
decompressor: Option<F>,
) -> Result<Vec<Record>>
where
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
{
Expand All @@ -526,35 +571,7 @@ impl RecordBatchDecoder {
}
Ok(records)
}
fn decode_batch<B: ByteBuf, F>(
buf: &mut B,
records: &mut Vec<Record>,
decompress_func: Option<&F>,
) -> Result<()>
where
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
{
let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
match version {
0..=1 => Record::decode_legacy(buf, version, records),
2 => Self::decode_new_batch(buf, version, records, decompress_func),
_ => {
bail!("Unknown record batch version ({})", version);
}
}
}
fn decode_new_records<B: ByteBuf>(
buf: &mut B,
batch_decode_info: &BatchDecodeInfo,
version: i8,
records: &mut Vec<Record>,
) -> Result<()> {
records.reserve(batch_decode_info.record_count);
for _ in 0..batch_decode_info.record_count {
records.push(Record::decode_new(buf, batch_decode_info, version)?);
}
Ok(())
}

fn decode_batch_info<B: ByteBuf>(
buf: &mut B,
version: i8,
Expand Down Expand Up @@ -652,6 +669,36 @@ impl RecordBatchDecoder {
compression,
}, buf.to_owned()))
}

fn decode_batch<B: ByteBuf, F>(
buf: &mut B,
records: &mut Vec<Record>,
decompress_func: Option<&F>,
) -> Result<()>
where
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
{
let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8;
match version {
0..=1 => Record::decode_legacy(buf, version, records),
2 => Self::decode_new_batch(buf, version, records, decompress_func),
_ => {
bail!("Unknown record batch version ({})", version);
}
}
}
fn decode_new_records<B: ByteBuf>(
buf: &mut B,
batch_decode_info: &BatchDecodeInfo,
version: i8,
records: &mut Vec<Record>,
) -> Result<()> {
records.reserve(batch_decode_info.record_count);
for _ in 0..batch_decode_info.record_count {
records.push(Record::decode_new(buf, batch_decode_info, version)?);
}
Ok(())
}
fn decode_new_batch<B: ByteBuf, F>(
buf: &mut B,
version: i8,
Expand All @@ -664,28 +711,37 @@ impl RecordBatchDecoder {
let (batch_decode_info, mut buf) = Self::decode_batch_info(buf, version)?;
let compression = batch_decode_info.compression;


if let Some(decompress_func) = decompress_func {
let mut decompressed_buf = decompress_func(&mut buf, compression)?;

Self::decode_new_records(&mut decompressed_buf, &batch_decode_info, version, records)?;
} else {
match batch_decode_info.compression {
match compression {
Compression::None => cmpr::None::decompress(&mut buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::decompress(&mut buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::decompress(&mut buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::decompress(&mut buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::decompress(&mut buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[allow(unreachable_patterns)]
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
};
}

Expand Down

0 comments on commit 0695ef3

Please sign in to comment.