Skip to content

Commit

Permalink
perf: refactor segment reader
Browse files Browse the repository at this point in the history
skip heap allocation for no compression
  • Loading branch information
marvin-j97 committed Jan 22, 2025
1 parent f73c8b0 commit c968c0f
Showing 1 changed file with 25 additions and 45 deletions.
70 changes: 25 additions & 45 deletions src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
// (found in the LICENSE-* files in the repository)

use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, UserValue};
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, Slice, UserValue};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
fs::File,
io::{BufReader, Read, Seek},
path::Path,
};

macro_rules! fail_iter {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Some(Err(e.into())),
}
};
}

/// Reads through a segment in order.
pub struct Reader<C: Compressor + Clone> {
pub(crate) segment_id: SegmentId,
Expand Down Expand Up @@ -79,54 +88,25 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {
}
}

let checksum = match self.inner.read_u64::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};
let checksum = fail_iter!(self.inner.read_u64::<BigEndian>());

let key_len = match self.inner.read_u16::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut key = vec![0; key_len.into()];
if let Err(e) = self.inner.read_exact(&mut key) {
return Some(Err(e.into()));
};

let val_len = match self.inner.read_u32::<BigEndian>() {
Ok(v) => v,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
return Some(Err(e.into()));
}
};

let mut val = vec![0; val_len as usize];
if let Err(e) = self.inner.read_exact(&mut val) {
return Some(Err(e.into()));
};
let key_len = fail_iter!(self.inner.read_u16::<BigEndian>());
let key = fail_iter!(Slice::from_reader(&mut self.inner, key_len as usize));

let val_len = fail_iter!(self.inner.read_u32::<BigEndian>());
let val = match &self.compression {
Some(compressor) => match compressor.decompress(&val) {
Ok(val) => val,
Err(e) => return Some(Err(e)),
},
None => val,
Some(compressor) => {
let mut val = vec![0; val_len as usize];
fail_iter!(self.inner.read_exact(&mut val));
Slice::from(fail_iter!(compressor.decompress(&val)))
}
None => {
// NOTE: When not using compression, we can skip
// the intermediary heap allocation and read directly into a Slice
fail_iter!(Slice::from_reader(&mut self.inner, val_len as usize))
}
};

Some(Ok((key.into(), val.into(), checksum)))
Some(Ok((key, val, checksum)))
}
}

0 comments on commit c968c0f

Please sign in to comment.