Skip to content

Commit

Permalink
WAL refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kaimast committed Jan 21, 2025
1 parent f9910c9 commit f1841eb
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 229 deletions.
209 changes: 5 additions & 204 deletions src/wal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
#![allow(clippy::await_holding_lock)]

#[cfg(not(feature = "_async-io"))]
use std::fs::{File, OpenOptions};

#[cfg(not(feature = "_async-io"))]
use std::io::{Read, Seek};

use std::sync::Arc;

use parking_lot::{Mutex, RwLock};
Expand All @@ -20,14 +14,15 @@ use monoio::fs::{File, OpenOptions};
#[cfg(feature = "wisckey")]
use crate::values::FreelistPageId;

use cfg_if::cfg_if;

use crate::memtable::Memtable;
use crate::{Error, Params, WriteOp};

mod writer;
use writer::WalWriter;

mod reader;
use reader::WalReader;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -146,64 +141,8 @@ impl WriteAheadLog {
// This reads the file(s) in the current thread
// because we cannot send stuff between threads easily

let mut position = start_position;
let mut count: usize = 0;

let fpos = position / PAGE_SIZE;

cfg_if! {
if #[cfg(feature="_async-io")] {
let mut log_file = WalWriter::open_file(&params, fpos).await.map_err(|err| Error::from_io_error("Failed to open write-ahead log", err))?;
} else {
let file_offset = position % PAGE_SIZE;
let mut log_file = WalWriter::open_file(&params, fpos).await.map_err(|err| Error::from_io_error("Failed to open write-ahead log", err))?;
log_file.seek(std::io::SeekFrom::Start(file_offset)).unwrap();
}
}

// Re-insert ops into memtable
loop {
let mut log_type = [0u8; 1];
let success = Self::read_from_log(
&mut log_file,
&mut position,
&mut log_type[..],
&params,
true,
)
.await
.map_err(|err| Error::from_io_error("Failed to read write-ahead log", err))?;

if !success {
break;
}

cfg_if! {
if #[cfg(feature="wisckey")] {
let success = if log_type[0] == LogEntry::WRITE {
Self::parse_write_entry(&mut log_file, memtable, &mut position, &params).await?
} else if log_type[1] == LogEntry::VALUE_DELETION {
Self::parse_value_deletion_entry(&mut log_file, &mut position, &params).await?
} else {
panic!("Unexpected log entry type!");
};
} else {
let success = if log_type[0] == LogEntry::WRITE {
Self::parse_write_entry(&mut log_file, memtable, &mut position, &params).await?
} else {
panic!("Unexpected log entry type!");
};
}
}

if !success {
break;
}

count += 1;
}

log::debug!("Found {count} entries in write-ahead log");
let mut reader = WalReader::new(params.clone(), start_position).await?;
let position = reader.run(memtable).await?;

let status = LogStatus {
queue_pos: position,
Expand All @@ -230,64 +169,6 @@ impl WriteAheadLog {
})
}

async fn parse_write_entry(
log_file: &mut File,
memtable: &mut Memtable,
position: &mut u64,
params: &Params,
) -> Result<bool, Error> {
const KEY_LEN_SIZE: usize = std::mem::size_of::<u64>();
const HEADER_SIZE: usize = std::mem::size_of::<u8>() + KEY_LEN_SIZE;

let mut op_header = [0u8; HEADER_SIZE];
let success = Self::read_from_log(log_file, position, &mut op_header[..], &params, true)
.await
.map_err(|err| Error::from_io_error("Failed to read write-ahead log", err))?;

if !success {
return Ok(false);
}

let op_type = op_header[1];

let key_len_data: &[u8; KEY_LEN_SIZE] = &op_header[1..].try_into().unwrap();
let key_len = u64::from_le_bytes(*key_len_data);

let mut key = vec![0; key_len as usize];
Self::read_from_log(log_file, position, &mut key, &params, false)
.await
.unwrap();

if op_type == WriteOp::PUT_OP {
let mut val_len = [0u8; 8];
Self::read_from_log(log_file, position, &mut val_len, &params, false)
.await
.unwrap();

let val_len = u64::from_le_bytes(val_len);
let mut value = vec![0; val_len as usize];

Self::read_from_log(log_file, position, &mut value, &params, false)
.await
.unwrap();
memtable.put(key, value);
} else if op_type == WriteOp::DELETE_OP {
memtable.delete(key);
} else {
panic!("Unexpected op type!");
}

Ok(true)
}

async fn parse_value_deletion_entry(
log_file: &mut File,
position: &mut u64,
params: &Params,
) -> Result<bool, Error> {
todo!();
}

/// Spawns the background task that will actually write
/// to the WAL.
///
Expand Down Expand Up @@ -363,86 +244,6 @@ impl WriteAheadLog {
finish_receiver
}

/// Read the next entry from the log
/// (only used during recovery)
///
/// TODO: Change this to just fetch an entire page at a time
async fn read_from_log(
log_file: &mut File,
position: &mut u64,
out: &mut [u8],
params: &Params,
maybe: bool,
) -> Result<bool, std::io::Error> {
let start_pos = *position;
let buffer_len = out.len() as u64;
let mut buffer_pos = 0;

assert!(buffer_len > 0);

while buffer_pos < buffer_len {
let mut file_offset = *position % PAGE_SIZE;
let file_remaining = PAGE_SIZE - file_offset;

assert!(file_remaining > 0);

let read_len = file_remaining.min(buffer_len - buffer_pos);

let read_start = buffer_pos as usize;
let read_end = (read_len + buffer_pos) as usize;

let read_slice = &mut out[read_start..read_end];

cfg_if! {
if #[cfg(feature="_async-io")] {
let buf = vec![0u8; read_slice.len()];
let (read_result, buf) = log_file.read_exact_at(buf, file_offset).await;
if read_result.is_ok() {
read_slice.copy_from_slice(&buf);
}
} else {
let read_result = log_file.read_exact(read_slice);
}
}

match read_result {
Ok(_) => {
*position += read_len;
file_offset += read_len;
}
Err(err) => {
if maybe {
return Ok(false);
} else {
return Err(err);
}
}
}

assert!(file_offset <= PAGE_SIZE);
buffer_pos = *position - start_pos;

if file_offset == PAGE_SIZE {
// Try to open next file
let fpos = *position / PAGE_SIZE;

*log_file = match WalWriter::open_file(params, fpos).await {
Ok(file) => file,
Err(err) => {
if maybe {
*log_file = WalWriter::create_file(params, fpos).await?;
return Ok(buffer_pos == buffer_len);
} else {
return Err(err);
}
}
}
}
}

Ok(true)
}

/// Stores an operation and returns the new position in
/// the logfile
#[tracing::instrument(skip(self, batch))]
Expand Down
Loading

0 comments on commit f1841eb

Please sign in to comment.