Skip to content

Commit

Permalink
fix: insert metrics tracking (#2935)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Apr 26, 2024
1 parent 60919c9 commit 8955580
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 54 deletions.
58 changes: 32 additions & 26 deletions crates/datasources/src/lance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
use futures::StreamExt;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{WriteMode, WriteParams};
Expand Down Expand Up @@ -147,35 +148,40 @@ impl ExecutionPlan for LanceInsertExecPlan {

fn execute(
&self,
_partition: usize,
partition: usize,
ctx: Arc<TaskContext>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
let mut stream = execute_stream(self.input.clone(), ctx)?.chunks(32);
let mut input = execute_stream(self.input.clone(), ctx)?.chunks(32);
let mut ds = self.dataset.clone();
let schema = self.input.schema();

Ok(Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
futures::stream::once(async move {
let write_opts = WriteParams {
mode: WriteMode::Append,
..Default::default()
};

let mut count: u64 = 0;
while let Some(batches) = stream.next().await {
let start = ds.count_rows().await?;
let rbi = RecordBatchIterator::new(
batches
.into_iter()
.map(|v| v.map_err(|dfe| ArrowError::ExternalError(Box::new(dfe)))),
schema.clone(),
);
ds.append(rbi, Some(write_opts.clone())).await?;
count += (ds.count_rows().await? - start) as u64;
}
Ok::<RecordBatch, DataFusionError>(create_count_record_batch(count))
}),
let schema = self.input.schema().clone();

Ok(Box::pin(DataSourceMetricsStreamAdapter::new(
RecordBatchStreamAdapter::new(
COUNT_SCHEMA.clone(),
futures::stream::once(async move {
let write_opts = WriteParams {
mode: WriteMode::Append,
..Default::default()
};

let mut count: u64 = 0;
while let Some(batches) = input.next().await {
let start = ds.count_rows().await?;
let rbi = RecordBatchIterator::new(
batches
.into_iter()
.map(|v| v.map_err(|dfe| ArrowError::ExternalError(Box::new(dfe)))),
schema.clone(),
);
ds.append(rbi, Some(write_opts.clone())).await?;
count += (ds.count_rows().await? - start) as u64;
}
Ok::<RecordBatch, DataFusionError>(create_count_record_batch(count))
})
.boxed(),
),
partition,
&self.metrics,
)))
}

Expand Down
62 changes: 34 additions & 28 deletions crates/datasources/src/mongodb/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
use futures::StreamExt;
use mongodb::bson::RawDocumentBuf;
use mongodb::Collection;
Expand Down Expand Up @@ -96,37 +97,42 @@ impl ExecutionPlan for MongoDbInsertExecPlan {
));
}

let mut stream = execute_stream(self.input.clone(), ctx)?;
let mut input = execute_stream(self.input.clone(), ctx)?;
let coll = self.collection.clone();

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.input.schema(),
futures::stream::once(async move {
let mut count: u64 = 0;
while let Some(batch) = stream.next().await {
let rb = batch?;

let mut docs = Vec::with_capacity(rb.num_rows());
let converted = crate::bson::BsonBatchConverter::from_record_batch(rb);

for d in converted.into_iter() {
let doc = d.map_err(|e| DataFusionError::Execution(e.to_string()))?;

docs.push(
RawDocumentBuf::from_document(&doc)
.map_err(|e| DataFusionError::Execution(e.to_string()))?,
);
Ok(Box::pin(DataSourceMetricsStreamAdapter::new(
RecordBatchStreamAdapter::new(
COUNT_SCHEMA.clone(),
futures::stream::once(async move {
let mut count: u64 = 0;
while let Some(batch) = input.next().await {
let rb = batch?;

let mut docs = Vec::with_capacity(rb.num_rows());
let converted = crate::bson::BsonBatchConverter::from_record_batch(rb);

for d in converted.into_iter() {
let doc = d.map_err(|e| DataFusionError::Execution(e.to_string()))?;

docs.push(
RawDocumentBuf::from_document(&doc)
.map_err(|e| DataFusionError::Execution(e.to_string()))?,
);
}

count += coll
.insert_many(docs, None)
.await
.map(|res| res.inserted_ids.len())
.map_err(|e| DataFusionError::External(Box::new(e)))?
as u64;
}

count += coll
.insert_many(docs, None)
.await
.map(|res| res.inserted_ids.len())
.map_err(|e| DataFusionError::External(Box::new(e)))?
as u64;
}
Ok::<RecordBatch, DataFusionError>(create_count_record_batch(count))
}),
Ok::<RecordBatch, DataFusionError>(create_count_record_batch(count))
})
.boxed(),
),
partition,
&self.metrics,
)))
}

Expand Down

0 comments on commit 8955580

Please sign in to comment.