Skip to content

Commit

Permalink
feat: COPY TO support
Browse files Browse the repository at this point in the history
Fixes #962

Signed-off-by: Vaibhav <[email protected]>
  • Loading branch information
vrongmeal committed Jul 4, 2023
1 parent 1cdec0d commit 63a1ea6
Show file tree
Hide file tree
Showing 20 changed files with 1,495 additions and 272 deletions.
9 changes: 9 additions & 0 deletions crates/datasources/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ pub enum DatasourceCommonError {

#[error(transparent)]
FmtError(#[from] core::fmt::Error),

#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),

#[error(transparent)]
ArrowError(#[from] datafusion::arrow::error::ArrowError),

#[error(transparent)]
DatafusionError(#[from] datafusion::common::DataFusionError),
}

pub type Result<T, E = DatasourceCommonError> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions crates/datasources/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Common data source utilities.
pub mod errors;
pub mod listing;
pub mod sink;
pub mod ssh;
pub mod util;
141 changes: 141 additions & 0 deletions crates/datasources/src/common/sink/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::csv::{Writer as CsvWriter, WriterBuilder as CsvWriterBuilder};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use crate::common::errors::Result;

use super::SharedBuffer;

const BUFFER_SIZE: usize = 2 * 1024 * 1024;

#[derive(Debug, Clone)]
pub struct CsvSinkOpts {
/// Delimiter between values.
pub delim: u8,
/// Include header.
pub header: bool,
}

impl Default for CsvSinkOpts {
fn default() -> Self {
CsvSinkOpts {
delim: b',',
header: true,
}
}
}

#[derive(Debug)]
pub struct CsvSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: CsvSinkOpts,
}

impl fmt::Display for CsvSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CsvSink({}:{})", self.store, self.loc)
}
}

impl CsvSink {
pub fn from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: CsvSinkOpts,
) -> CsvSink {
CsvSink {
store,
loc: loc.into(),
opts,
}
}

async fn stream_into_inner(&self, mut stream: SendableRecordBatchStream) -> Result<usize> {
let (_id, obj_handle) = self.store.put_multipart(&self.loc).await?;
let mut writer = AsyncCsvWriter::new(obj_handle, BUFFER_SIZE, &self.opts);

while let Some(batch) = stream.next().await {
let batch = batch?;
writer.write_batch(&batch).await?;
}
writer.finish().await?;

Ok(0)
}
}

#[async_trait]
impl DataSink for CsvSink {
async fn write_all(&self, stream: SendableRecordBatchStream) -> DfResult<u64> {
self.stream_into_inner(stream)
.await
.map(|x| x as u64)
.map_err(|e| DataFusionError::External(Box::new(e)))
}
}

/// Wrapper around Arrow's csv writer to provide async write support.
///
/// Modeled after the parquet crate's `AsyncArrowWriter`.
struct AsyncCsvWriter<W> {
async_writer: W,
sync_writer: CsvWriter<SharedBuffer>,
buffer: SharedBuffer,
row_count: usize,
}

impl<W: AsyncWrite + Unpin + Send> AsyncCsvWriter<W> {
fn new(async_writer: W, buf_size: usize, sink_opts: &CsvSinkOpts) -> Self {
let buf = SharedBuffer::with_capacity(buf_size);
let sync_writer = CsvWriterBuilder::new()
.with_delimiter(sink_opts.delim)
.has_headers(sink_opts.header)
.build(buf.clone());

AsyncCsvWriter {
async_writer,
sync_writer,
buffer: buf,
row_count: 0,
}
}

async fn write_batch(&mut self, batch: &RecordBatch) -> Result<()> {
self.sync_writer.write(batch)?;
self.try_flush(false).await?;
self.row_count += batch.num_rows();
Ok(())
}

async fn finish(mut self) -> Result<usize> {
self.try_flush(true).await?;
self.async_writer.shutdown().await?;
Ok(self.row_count)
}

async fn try_flush(&mut self, force: bool) -> Result<()> {
let mut buf = self.buffer.buffer.try_lock().unwrap();
if !force && buf.len() < buf.capacity() / 2 {
return Ok(());
}

self.async_writer.write_all(&buf).await?;
self.async_writer.flush().await?;

buf.clear();

Ok(())
}
}
141 changes: 141 additions & 0 deletions crates/datasources/src/common/sink/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use async_trait::async_trait;
use datafusion::arrow::json::writer::{JsonArray, JsonFormat, LineDelimited, Writer as JsonWriter};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::Result as DfResult;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::{path::Path as ObjectPath, ObjectStore};
use std::fmt::Display;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use crate::common::errors::Result;

use super::SharedBuffer;

const BUFFER_SIZE: usize = 2 * 1024 * 1024;

#[derive(Debug, Clone)]
pub struct JsonSinkOpts {
/// If the batches should be written out as a json array.
pub array: bool,
}

#[allow(clippy::derivable_impls)]
impl Default for JsonSinkOpts {
fn default() -> Self {
JsonSinkOpts { array: false }
}
}

#[derive(Debug)]
pub struct JsonSink {
store: Arc<dyn ObjectStore>,
loc: ObjectPath,
opts: JsonSinkOpts,
}

impl Display for JsonSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "JsonSink({}:{})", self.store, self.loc)
}
}

impl JsonSink {
pub fn from_obj_store(
store: Arc<dyn ObjectStore>,
loc: impl Into<ObjectPath>,
opts: JsonSinkOpts,
) -> JsonSink {
JsonSink {
store,
loc: loc.into(),
opts,
}
}

async fn stream_into_inner(&self, stream: SendableRecordBatchStream) -> Result<usize> {
Ok(if self.opts.array {
self.formatted_stream::<JsonArray>(stream).await?
} else {
self.formatted_stream::<LineDelimited>(stream).await?
})
}

async fn formatted_stream<F: JsonFormat>(
&self,
mut stream: SendableRecordBatchStream,
) -> Result<usize> {
let (_id, obj_handle) = self.store.put_multipart(&self.loc).await?;
let mut writer = AsyncJsonWriter::<_, F>::new(obj_handle, BUFFER_SIZE);
while let Some(batch) = stream.next().await {
let batch = batch?;
writer.write_batch(batch).await?;
}
writer.finish().await
}
}

#[async_trait]
impl DataSink for JsonSink {
async fn write_all(&self, data: SendableRecordBatchStream) -> DfResult<u64> {
self.stream_into_inner(data)
.await
.map(|x| x as u64)
.map_err(|e| DataFusionError::External(Box::new(e)))
}
}

/// Wrapper around Arrow's json writer to provide async write support.
///
/// Modeled after the parquet crate's `AsyncArrowWriter`.
struct AsyncJsonWriter<W, F: JsonFormat> {
async_writer: W,
sync_writer: JsonWriter<SharedBuffer, F>,
buffer: SharedBuffer,
row_count: usize,
}

impl<W: AsyncWrite + Unpin + Send, F: JsonFormat> AsyncJsonWriter<W, F> {
fn new(async_writer: W, buf_size: usize) -> Self {
let buf = SharedBuffer::with_capacity(buf_size);
let sync_writer = JsonWriter::new(buf.clone());
AsyncJsonWriter {
async_writer,
sync_writer,
buffer: buf,
row_count: 0,
}
}

async fn write_batch(&mut self, batch: RecordBatch) -> Result<()> {
let num_rows = batch.num_rows();
self.sync_writer.write(&batch)?;
self.try_flush(false).await?;
self.row_count += num_rows;
Ok(())
}

async fn finish(mut self) -> Result<usize> {
self.sync_writer.finish()?;
self.try_flush(true).await?;
self.async_writer.shutdown().await?;
Ok(self.row_count)
}

async fn try_flush(&mut self, force: bool) -> Result<()> {
let mut buf = self.buffer.buffer.try_lock().unwrap();
if !force && buf.len() < buf.capacity() / 2 {
return Ok(());
}

self.async_writer.write_all(&buf).await?;
self.async_writer.flush().await?;

buf.clear();

Ok(())
}
}
34 changes: 34 additions & 0 deletions crates/datasources/src/common/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
pub mod csv;
pub mod json;
pub mod parquet;

use std::io::{self, Write};
use std::sync::Arc;

/// A simple buffer to aid in converting writers to async writers. It's expected
/// that the lock has no contention.
#[derive(Clone)]
pub struct SharedBuffer {
pub buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
/// Create a new buffer with capacity.
pub fn with_capacity(cap: usize) -> Self {
SharedBuffer {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(cap))),
}
}
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}
Loading

0 comments on commit 63a1ea6

Please sign in to comment.