Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] add fuzz test for topk #7772

Merged
merged 2 commits into from
Oct 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 244 additions & 64 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,89 +22,100 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
use arrow_array::{Float64Array, StringArray};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::{
datasource::MemTable,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
};
use datafusion_common::{
cast::{as_float64_array, as_string_array},
TableReference,
};
use datafusion_execution::memory_pool::GreedyMemoryPool;
use rand::Rng;
use datafusion_physical_expr::expressions::col;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::sync::Arc;
use test_utils::{batches_to_vec, partitions_to_sorted_vec};
use test_utils::{batches_to_vec, partitions_to_sorted_vec, stagger_batch};

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const KB: u64 = 1 << 10;

const KB: usize = 1 << 10;
#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_1k_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(10240)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(10240)
.with_should_spill(true)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(10240)
.with_should_spill(true)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, true), (1000000, true)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(10 * KB)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(102400)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(102400)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(102400)
.with_should_spill(true)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, true)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(100 * KB)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
async fn test_sort_unlimited_mem() {
SortTest::new()
.with_int32_batches(5)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(20000)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;

SortTest::new()
.with_int32_batches(1000000)
.with_pool_size(usize::MAX)
.with_should_spill(false)
.run()
.await;
for (batch_size, should_spill) in [(5, false), (20000, false), (1000000, false)] {
SortTest::new()
.with_int32_batches(batch_size)
.with_pool_size(usize::MAX)
.with_should_spill(should_spill)
.run()
.await;
}
}

#[tokio::test]
async fn test_sort_topk() {
for size in [10, 100, 1000, 10000, 1000000] {
let mut topk_scenario = TopKScenario::new()
.with_limit(10)
.with_table_name("t")
.with_col_name("x");

// test topk with i32
let collected_i32 = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::I32))
.run_with_limit(&topk_scenario)
.await;
let actual = batches_to_vec(&collected_i32);
let excepted_i32 = topk_scenario.excepted_i32();
assert_eq!(actual, excepted_i32);

// test topk with f64
let collected_f64 = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::F64))
.run_with_limit(&topk_scenario)
.await;
let actual: Vec<Option<f64>> = batches_to_f64_vec(&collected_f64);
let excepted_f64 = topk_scenario.excepted_f64();
assert_eq!(actual, excepted_f64);

// test topk with str
let collected_str = SortTest::new()
.with_input(topk_scenario.batches(size, ColType::Str))
.run_with_limit(&topk_scenario)
.await;
let actual: Vec<Option<&str>> = batches_to_str_vec(&collected_str);
let excepted_str = topk_scenario.excepted_str();
assert_eq!(actual, excepted_str);
}
}

#[derive(Debug, Default)]
Expand All @@ -121,6 +132,11 @@ impl SortTest {
Default::default()
}

fn with_input(mut self, batches: Vec<Vec<RecordBatch>>) -> Self {
self.input = batches.clone();
self
}

/// Create batches of int32 values of rows
fn with_int32_batches(mut self, rows: usize) -> Self {
self.input = vec![make_staggered_i32_batches(rows)];
Expand All @@ -138,6 +154,44 @@ impl SortTest {
self
}

async fn run_with_limit<'a>(
&self,
topk_scenario: &TopKScenario<'a>,
) -> Vec<RecordBatch> {
let input = self.input.clone();
let schema = input
.iter()
.flat_map(|p| p.iter())
.next()
.expect("at least one batch")
.schema();

let table = MemTable::try_new(schema, input.clone()).unwrap();

let ctx = SessionContext::new();

ctx.register_table(
TableReference::Bare {
table: topk_scenario.table_name.into(),
},
Arc::new(table),
)
.unwrap();

let df = ctx
.table(topk_scenario.table_name)
.await
.unwrap()
.sort(vec![
datafusion_expr::col(topk_scenario.col_name).sort(true, true)
])
.unwrap()
.limit(0, Some(topk_scenario.limit))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified the plan here has the expected TopK element:

| physical_plan                                              | GlobalLimitExec: skip=0, fetch=10                    |
|                                                            |   SortExec: TopK(fetch=10), expr=[x@0 ASC]           |
|                                                            |     MemoryExec: partitions=1, partition_sizes=[38]   |

.unwrap();

df.collect().await.unwrap()
}

/// Sort the input using SortExec and ensure the results are
/// correct according to `Vec::sort` both with and without spilling
async fn run(&self) {
Expand Down Expand Up @@ -208,6 +262,109 @@ impl SortTest {
}
}

enum ColType {
I32,
F64,
Str,
}

struct TopKScenario<'a> {
limit: usize,
batches: Vec<Vec<RecordBatch>>,
table_name: &'a str,
col_name: &'a str,
}

impl<'a> TopKScenario<'a> {
fn new() -> Self {
TopKScenario {
limit: 0,
batches: vec![],
table_name: "",
col_name: "",
}
}

fn with_limit(mut self, limit: usize) -> Self {
self.limit = limit;
self
}

fn with_table_name(mut self, table_name: &'a str) -> Self {
self.table_name = table_name;
self
}

fn with_col_name(mut self, col_name: &'a str) -> Self {
self.col_name = col_name;
self
}

fn batches(&mut self, len: usize, t: ColType) -> Vec<Vec<RecordBatch>> {
let batches = match t {
ColType::I32 => make_staggered_i32_batches(len),
ColType::F64 => make_staggered_f64_batches(len),
ColType::Str => make_staggered_str_batches(len),
};
self.batches = vec![batches];
self.batches.clone()
}

fn excepted_i32(&self) -> Vec<Option<i32>> {
let excepted = partitions_to_sorted_vec(&self.batches);
excepted[0..self.limit].into()
}

fn excepted_f64(&self) -> Vec<Option<f64>> {
let mut excepted: Vec<Option<f64>> = self
.batches
.iter()
.flat_map(|batches| batches_to_f64_vec(batches).into_iter())
.collect();
excepted.sort_by(|a, b| a.partial_cmp(b).unwrap());
excepted[0..self.limit].into()
}

fn excepted_str(&self) -> Vec<Option<&str>> {
let mut excepted: Vec<Option<&str>> = self
.batches
.iter()
.flat_map(|batches| batches_to_str_vec(batches).into_iter())
.collect();
excepted.sort_unstable();
excepted[0..self.limit].into()
}
}

impl Default for TopKScenario<'_> {
fn default() -> Self {
Self::new()
}
}

fn make_staggered_f64_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = StdRng::seed_from_u64(100);
let remainder = RecordBatch::try_from_iter(vec![(
"x",
Arc::new(Float64Array::from_iter_values(
(0..len).map(|_| rng.gen_range(0.0..1000.7)),
)) as ArrayRef,
)])
.unwrap();
stagger_batch(remainder)
}

fn make_staggered_str_batches(len: usize) -> Vec<RecordBatch> {
let remainder = RecordBatch::try_from_iter(vec![(
"x",
Arc::new(StringArray::from_iter_values(
(0..len).map(|_| get_random_string(6)),
)) as ArrayRef,
)])
.unwrap();
stagger_batch(remainder)
}

/// Return randomly sized record batches in a field named 'x' of type `Int32`
/// with randomized i32 content
fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
Expand All @@ -232,3 +389,26 @@ fn make_staggered_i32_batches(len: usize) -> Vec<RecordBatch> {
}
batches
}

/// Return random ASCII String with len
fn get_random_string(len: usize) -> String {
rand::thread_rng()
.sample_iter(rand::distributions::Alphanumeric)
.take(len)
.map(char::from)
.collect()
}

fn batches_to_f64_vec(batches: &[RecordBatch]) -> Vec<Option<f64>> {
batches
.iter()
.flat_map(|batch| as_float64_array(batch.column(0)).unwrap().iter())
.collect()
}

fn batches_to_str_vec(batches: &[RecordBatch]) -> Vec<Option<&str>> {
batches
.iter()
.flat_map(|batch| as_string_array(batch.column(0)).unwrap().iter())
.collect()
}