Skip to content

Commit

Permalink
Add in more index info
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 16, 2024
1 parent 9120f47 commit 266b932
Showing 1 changed file with 106 additions and 45 deletions.
151 changes: 106 additions & 45 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::listing::PartitionedFile;
Expand All @@ -33,7 +33,7 @@ use datafusion::prelude::*;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
internal_datafusion_err, plan_datafusion_err, DFSchema, DataFusionError, Result,
internal_datafusion_err, DFSchema, DataFusionError, Result,
Statistics,
};
use datafusion_expr::utils::conjunction;
Expand All @@ -57,7 +57,7 @@ use url::Url;
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
#[tokio::main]
async fn main() -> Result<()> {
// Create some dummy data for this exampl
// Create some dummy data for this example
let data = DemoData::try_new()?;

// Create the table provider that knows how to read the parquet files and their metadata
Expand All @@ -70,7 +70,7 @@ async fn main() -> Result<()> {
ctx.register_table("index_table", Arc::new(provider))?;
// register file:// object store provider
// Get this error if not there:
// Error: Internal("No suitable object store found for file://data/")
// Error: Internal("No suitable object store found for file://")
// TODO: should make the error more helpful (and add an example of how to register local file object store)
// todo add example of how to register local file object store
let url = Url::try_from("file://")
Expand Down Expand Up @@ -110,42 +110,29 @@ async fn main() -> Result<()> {
/// stored index
///```
pub struct IndexTableProvider {
/// what is the schema of this table?
schema: SchemaRef,
/// What are the paths of the parquet files?
/// The index of the parquet files in the directory
index: ParquetMetadataIndex,
/// The files (TODO remove)
files: Vec<Vec<PartitionedFile>>,
}



impl IndexTableProvider {
/// Create a new IndexTableProvider
pub fn try_new(dir: impl Into<PathBuf>) -> Result<Self> {
// Create an index of the parquet files in the directory as we see them.
let mut index_builder = ParquetMetadataIndexBuilder::new();

let dir = dir.into();

let files = dir
.read_dir()
.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory {dir:?}"))
})?
.map(|entry| {
entry.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory entry in {dir:?}"))
})
})
.collect::<Result<Vec<DirEntry>>>()?;
let files = read_dir(&dir)?;
for file in &files {
index_builder.add_file(&file.path())?;
}

// Get the schema for the parquet files in the given directory, assuming they
// are all the same
//
// Note this could be done with SchemaAdapter once DataFusion 39 is released
let first_file = files
.first()
.ok_or_else(|| plan_datafusion_err!("No files in directory {dir:?}"))?
.path();
let index = index_builder.build()?;

let schema = get_schema(&first_file)?;
// todo figure out actual indexing

// todo make a nicer API for this ("partitioned files" from directory)
let files = files
Expand All @@ -160,7 +147,7 @@ impl IndexTableProvider {
.collect::<Result<Vec<_>>>()?;
let files = vec![files];

Ok(Self { schema, files })
Ok(Self { index, files })
}
}

Expand All @@ -186,7 +173,7 @@ impl TableProvider for IndexTableProvider {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
self.index.schema().clone()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -222,7 +209,7 @@ impl TableProvider for IndexTableProvider {
object_store_url,
file_schema: self.schema(),
file_groups,
statistics: Statistics::new_unknown(&self.schema),
statistics: Statistics::new_unknown(self.index.schema()),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
Expand Down Expand Up @@ -251,28 +238,105 @@ impl TableProvider for IndexTableProvider {
/// directly by the DataFusion [`PruningPredicate`] API
///
/// The index looks like
/// ```
/// (file_name, file_size row_count, value_column_null_count, value_column_min, value_column_max)
///```
/// | file_name | file_size | row_count | value_column_null_count | value_column_min | value_column_max |
/// |-----------|-----------|-----------|-------------------------|------------------|------------------|
/// | file1.parquet | 1000 | 100 | 0 | 0 | 100 |
/// | file2.parquet | 1000 | 100 | 0 | 100 | 200 |
/// | file3.parquet | 1000 | 100 | 0 | 200 | 300 |
///
/// // TODO eventually store this information for each row group within a file
/// the max value in the file
struct ParquetMetadataIndex {
file_schema: SchemaRef,
index: RecordBatch,
}

impl ParquetMetadataIndex {
fn schema(&self) -> &SchemaRef {
&self.file_schema
}
}

/// Builds [`ParquetMetadataIndex`] from a set of parquet files
#[derive(Debug, Default)]
struct ParquetMetadataIndexBuilder {
file_schema: Option<SchemaRef>,
filenames: Vec<String>,
file_sizes: Vec<usize>,
row_counts: Vec<usize>,
value_column_null_counts: Vec<usize>,
value_column_mins: Vec<i32>,
value_column_maxs: Vec<i32>,
file_sizes: Vec<u64>,
row_counts: Vec<u64>,
value_column_null_counts: Vec<u64>,
/// Holds the min/max value of the value column for each file as a single
/// value array.
///
/// In the more advanced parquet index, this will hold values for each row
/// group within a file
value_column_mins: Vec<ArrayRef>,
value_column_maxs: Vec<ArrayRef>,
}


impl ParquetMetadataIndexBuilder {
fn new() -> Self {
Self::default()
}

/// Add a file to the index
fn add_file(&mut self, filename: &Path) -> Result<()> {
println!("Adding file {:?}", filename);
if self.file_schema.is_none() {
self.file_schema = Some(get_schema(filename)?);
}


todo!();
}

/// Build the index from the files added
fn build(self) -> Result<ParquetMetadataIndex> {
let Some(file_schema) = self.file_schema else {
return Err(internal_datafusion_err!("No files added to index"));
};

let value_column_mins = concat_arrays(&self.value_column_mins)?;
let value_column_maxs = concat_arrays(&self.value_column_maxs)?;

let index = RecordBatch::try_from_iter(vec![
("file_name", Arc::new(StringArray::from(self.filenames)) as ArrayRef),
("file_size", Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef),
("row_count", Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef),
("value_column_null_count", Arc::new(UInt64Array::from(self.value_column_null_counts)) as ArrayRef),
("value_column_min", value_column_mins),
("value_column_max", value_column_maxs),
])?;

Ok(ParquetMetadataIndex { file_schema, index })
}
}

/// Return a list of the directory entries in the given directory, sorted by name
fn read_dir(dir: &Path) -> Result<Vec<DirEntry>> {
let mut files = dir
.read_dir()
.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory {dir:?}"))
})?
.map(|entry| {
entry.map_err(|e| {
DataFusionError::from(e)
.context(format!("Error reading directory entry in {dir:?}"))
})
})
.collect::<Result<Vec<DirEntry>>>()?;
files.sort_by(|a, b| a.file_name().cmp(&b.file_name()));
Ok(files)
}

fn concat_arrays(arrays: &[ArrayRef]) -> Result<ArrayRef> {
// Need to use refs for some reason
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
Ok(arrow::compute::concat(&arrays)?)
}

/// Demonstration Data
///
Expand All @@ -288,9 +352,6 @@ struct DemoData {
impl DemoData {
fn try_new() -> Result<Self> {
let tmpdir = TempDir::new()?;
let data_dir = tmpdir.path().join("data");
fs::create_dir_all(&data_dir)
.map_err(|e| internal_datafusion_err!("Error creating data dir: {e}"))?;
make_demo_file(tmpdir.path().join("file1.parquet"), 0..100)?;
make_demo_file(tmpdir.path().join("file2.parquet"), 100..200)?;
make_demo_file(tmpdir.path().join("file3.parquet"), 200..300)?;
Expand Down

0 comments on commit 266b932

Please sign in to comment.