Skip to content

Commit

Permalink
feat: demo for async writer
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Mar 16, 2023
1 parent 488b7ba commit f62cedc
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
all-features = true

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64", "async"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
# Enable CLI tools
Expand Down
75 changes: 74 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
use std::collections::VecDeque;
use std::io::Write;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use arrow_array::cast::as_primitive_array;
use arrow_array::types::Decimal128Type;
use arrow_array::{types, Array, ArrayRef, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
Expand Down Expand Up @@ -620,6 +621,78 @@ fn get_fsb_array_slice(
values
}

#[derive(Clone, Default)]
struct SharedBuffer {
buffer: Arc<Mutex<Vec<u8>>>,
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.lock().unwrap();
Write::flush(&mut *buffer)
}
}

pub struct AsyncArrowWriter<W> {
sync_writer: ArrowWriter<SharedBuffer>,
async_writer: W,
shared_buffer: SharedBuffer,
}

impl<W: AsyncWrite + Unpin> AsyncArrowWriter<W> {
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
) -> Result<Self> {
let shared_buffer = SharedBuffer::default();
let sync_writer =
ArrowWriter::try_new(shared_buffer.clone(), arrow_schema, props)?;

Ok(Self {
sync_writer,
async_writer: writer,
shared_buffer,
})
}

pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
Self::flush(&self.shared_buffer, &mut self.async_writer).await
}

pub async fn close(mut self) -> Result<crate::format::FileMetaData> {
let metadata = self.sync_writer.close()?;
Self::flush(&self.shared_buffer, &mut self.async_writer).await?;
Ok(metadata)
}

async fn flush(shared_buffer: &SharedBuffer, async_writer: &mut W) -> Result<()> {
let mut buffer = {
let mut buffer = shared_buffer.buffer.lock().unwrap();

if buffer.is_empty() {
return Ok(());
}
std::mem::take(&mut *buffer)
};

async_writer
.write(&buffer)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;

buffer.clear();
*shared_buffer.buffer.lock().unwrap() = buffer;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit f62cedc

Please sign in to comment.