From 064dcc2922608437e8bd743e3890c9dcdbf272b6 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 11 Sep 2022 16:25:42 +0800 Subject: [PATCH 01/22] feat(query): new input format framework. --- Cargo.lock | 5 + src/query/pipeline/sources/Cargo.toml | 11 + .../sources/input_formats/delimiter.rs | 57 +++ .../impls/input_format_parquet.rs | 138 +++++++ .../input_formats/impls/input_format_tsv.rs | 134 +++++++ .../sources/input_formats/impls/mod.rs | 18 + .../sources/input_formats/input_context.rs | 245 ++++++++++++ .../sources/input_formats/input_format.rs | 148 +++++++ .../input_formats/input_format_text.rs | 372 ++++++++++++++++++ .../sources/input_formats/input_pipeline.rs | 273 +++++++++++++ .../processors/sources/input_formats/mod.rs | 26 ++ .../sources/input_formats/source_aligner.rs | 172 ++++++++ .../input_formats/source_deserializer.rs | 116 ++++++ .../input_formats/transform_deserializer.rs | 150 +++++++ .../sources/src/processors/sources/mod.rs | 1 + .../src/storages/stage/stage_source.rs | 1 + 16 files changed, 1867 insertions(+) create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs diff --git a/Cargo.lock b/Cargo.lock index cb1dc7d4574ea..e3b1e4677f95f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1681,21 +1681,26 @@ dependencies = [ name = "common-pipeline-sources" version = "0.1.0" dependencies = [ + "async-channel", "async-trait-fn", + "common-arrow", "common-base", "common-catalog", "common-datablocks", + "common-datavalues", "common-exception", "common-formats", "common-io", "common-meta-types", "common-pipeline-core", + "common-settings", "common-storage", "common-streams", "futures", "futures-util", "opendal", "parking_lot 0.12.1", + "tracing", ] [[package]] diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index 1adbe3d98573b..5ded4dfe2724d 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -9,19 +9,30 @@ doctest = false test = false [dependencies] +async-channel = "1.7.1" +common-arrow = { path = "../../../common/arrow" } common-base = { path = "../../../common/base" } common-catalog = { path = "../../catalog" } common-datablocks = { path = "../../datablocks" } +common-datavalues = { path = "../../datavalues" } common-exception = { path = "../../../common/exception" } common-formats = { path = "../../formats" } common-io = { path = "../../../common/io" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../core" } +common-settings = { path = "../../settings" } common-storage = { path = "../../../common/storage" } common-streams = { path = "../../streams" } async-trait = { version = "0.1.0", package = "async-trait-fn" } +bstr = "0.2.17" +crossbeam-channel = "0.5.6" +csv-core = "0.1.10" futures = "0.3.21" futures-util = "0.3.21" opendal = { version = "0.17.1", features = ["layers-retry", "compress"] } parking_lot = "0.12.1" +serde_json = "1.0.81" +similar-asserts = "1.2.0" +tracing = "0.1.35" + diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs new file mode 100644 index 0000000000000..33ac7ecc15e9f --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs @@ -0,0 +1,57 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; + +pub enum RecordDelimiter { + Crlf, + Any(u8), +} + +impl RecordDelimiter { + pub fn end(&self) -> u8 { + match self { + RecordDelimiter::Crlf => b'\n', + RecordDelimiter::Any(b) => *b, + } + } +} + +impl TryFrom<&str> for RecordDelimiter { + type Error = ErrorCode; + fn try_from(s: &str) -> Result { + Self::try_from(s.as_bytes()) + } +} + +impl TryFrom<&[u8]> for RecordDelimiter { + type Error = ErrorCode; + fn try_from(s: &[u8]) -> Result { + match s.len() { + 1 => Ok(RecordDelimiter::Any(s[0])), + 2 if s.eq(b"\r\n") => Ok(RecordDelimiter::Crlf), + _ => Err(ErrorCode::InvalidArgument(format!( + "bad RecordDelimiter: '{:?}'", + s + ))), + } + } +} + +impl Default for RecordDelimiter { + fn default() -> Self { + RecordDelimiter::Crlf + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs new file mode 100644 index 0000000000000..fa3669cda8d92 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -0,0 +1,138 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use common_arrow::parquet::metadata::RowGroupMetaData; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::Object; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; +use crate::processors::sources::input_formats::InputFormat; + +struct InputFormatParquet; + +#[async_trait::async_trait] +impl InputFormat for InputFormatParquet { + async fn read_file_meta( + &self, + obj: &Object, + size: usize, + ) -> Result>> { + todo!() + } + + async fn read_split_meta( + &self, + obj: &Object, + split_info: &SplitInfo, + ) -> Result>> { + todo!() + } + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { + todo!() + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + todo!() + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + todo!() + } +} + +pub struct ParquetFormatPipe; + +#[async_trait::async_trait] +impl InputFormatPipe for ParquetFormatPipe { + type ReadBatch = ReadBatch; + type RowBatch = RowGroupInMemory; + type AligningState = AligningState; + type BlockBuilder = ParquetBlockBuilder; +} + +pub struct SplitMeta { + row_groups: Vec, +} + +pub struct RowGroupInMemory {} + +impl Debug for RowGroupInMemory { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RowGroupInMemory") + } +} + +#[derive(Debug)] +pub enum ReadBatch { + Buffer(Vec), + RowGroup(RowGroupInMemory), +} + +impl From> for ReadBatch { + fn from(v: Vec) -> Self { + Self::Buffer(v) + } +} + +pub struct ParquetBlockBuilder { + ctx: Arc, +} + +impl BlockBuilderTrait for ParquetBlockBuilder { + type Pipe = ParquetFormatPipe; + + fn create(ctx: Arc) -> Self { + ParquetBlockBuilder { ctx } + } + + fn deserialize(&mut self, batch: Option) -> Result> { + todo!() + } +} + +pub struct AligningState { + buffers: Vec>, +} + +impl AligningStateTrait for AligningState { + type Pipe = ParquetFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + todo!() + } + + fn align(&mut self, read_batch: Option) -> Result> { + todo!() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs new file mode 100644 index 0000000000000..59a0058f9bf2b --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -0,0 +1,134 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::TypeDeserializer; +use common_exception::ErrorCode; +use common_exception::Result; +use common_formats::verbose_string; +use common_io::prelude::BufferReadExt; +use common_io::prelude::FormatSettings; +use common_io::prelude::NestedCheckpointReader; +use common_meta_types::StageFileFormatType; + +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; + +pub struct InputFormatTSV {} + +impl InputFormatTSV { + fn read_row( + buf: &[u8], + deserializers: &mut Vec, + format_settings: &FormatSettings, + path: &str, + offset: usize, + row_index: Option, + ) -> Result<()> { + let num_columns = deserializers.len(); + let mut column_index = 0; + let mut field_start = 0; + let mut pos = 0; + let mut err_msg = None; + let buf_len = buf.len(); + while pos <= buf_len { + if pos == buf_len || buf[pos] == b'\t' { + let col_data = &buf[field_start..pos]; + if col_data.is_empty() { + deserializers[column_index].de_default(format_settings); + } else { + let mut reader = NestedCheckpointReader::new(col_data); + reader.ignores(|c: u8| c == b' ').expect("must success"); + if let Err(e) = + deserializers[column_index].de_text(&mut reader, format_settings) + { + err_msg = Some(format!( + "fail to decode column {}: {:?}, [column_data]=[{}]", + column_index, e, "" + )); + break; + }; + // todo(youngsofun): check remaining data + } + column_index += 1; + field_start = pos + 1; + if column_index > num_columns { + err_msg = Some("too many columns".to_string()); + break; + } + } + pos += 1; + } + if column_index < num_columns - 1 { + // todo(youngsofun): allow it optionally (set default) + err_msg = Some(format!( + "need {} columns, find {} only", + num_columns, + column_index + 1 + )); + } + if let Some(m) = err_msg { + let row_info = if let Some(r) = row_index { + format!("at row {},", r) + } else { + String::new() + }; + let mut msg = format!( + "fail to parse tsv {} at offset {}, {} reason={}, row data: ", + path, + offset + pos, + row_info, + m + ); + verbose_string(buf, &mut msg); + Err(ErrorCode::BadBytes(msg)) + } else { + Ok(()) + } + } +} + +impl InputFormatTextBase for InputFormatTSV { + fn format_type() -> StageFileFormatType { + StageFileFormatType::Tsv + } + + fn default_field_delimiter() -> u8 { + b'\t' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let mut start = 0usize; + let start_row = batch.start_row; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + Self::read_row( + buf, + columns, + &builder.ctx.format_settings, + &batch.path, + batch.offset + start, + start_row.map(|n| n + i), + )?; + start = *end + 1; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf: &[u8]) -> Result> { + Ok(state.align_by_record_delimiter(buf)) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs new file mode 100644 index 0000000000000..cd6abe156cb4d --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod input_format_csv; +pub mod input_format_ndjson; +pub mod input_format_parquet; +pub mod input_format_tsv; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs new file mode 100644 index 0000000000000..ee1881ece2f4e --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -0,0 +1,245 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::str::FromStr; +use std::sync::Arc; + +use common_base::base::Progress; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use common_meta_types::StageFileCompression; +use common_meta_types::StageFileFormatType; +use common_meta_types::UserStageInfo; +use common_settings::Settings; +use opendal::io_util::CompressAlgorithm; +use opendal::Operator; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_csv::InputFormatCSV; +use crate::processors::sources::input_formats::impls::input_format_ndjson::InputFormatNDJson; +use crate::processors::sources::input_formats::impls::input_format_parquet::InputFormatParquet; +use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_format_text::InputFormatText; +use crate::processors::sources::input_formats::InputFormat; + +pub enum InputPlan { + CopyInto(Box), + StreamingLoad, +} + +pub struct CopyIntoPlan { + pub stage_info: UserStageInfo, + pub files: Vec, +} + +pub struct InputContext { + pub plan: InputPlan, + pub schema: DataSchemaRef, + pub operator: Operator, + pub format: Arc, + pub splits: Vec, + + // row format only + pub rows_to_skip: usize, + pub field_delimiter: u8, + pub record_delimiter: RecordDelimiter, + + // runtime config + pub settings: Arc, + pub format_settings: FormatSettings, + + pub read_batch_size: usize, + pub rows_per_block: usize, + + pub scan_progress: Arc, +} + +impl InputContext { + pub fn get_input_format(format: &StageFileFormatType) -> Result> { + match format { + StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::::create())), + StageFileFormatType::Csv => Ok(Arc::new(InputFormatText::::create())), + StageFileFormatType::NdJson => { + Ok(Arc::new(InputFormatText::::create())) + } + StageFileFormatType::Parquet => Ok(Arc::new(InputFormatParquet {})), + format => Err(ErrorCode::LogicalError(format!( + "Unsupported file format: {:?}", + format + ))), + } + } + + pub async fn try_create_from_copy( + operator: Operator, + settings: Arc, + format_settings: FormatSettings, + schema: DataSchemaRef, + stage_info: UserStageInfo, + files: Vec, + scan_progress: Arc, + ) -> Result { + let plan = Box::new(CopyIntoPlan { stage_info, files }); + let read_batch_size = 1024 * 1024; + let split_size = 128usize * 1024 * 1024; + let file_format_options = &plan.stage_info.file_format_options; + let format = Self::get_input_format(&file_format_options.format)?; + let file_infos = Self::get_file_infos(&format, &operator, &plan).await?; + let splits = format.split_files(file_infos, split_size); + let rows_per_block = settings.get_max_block_size()? as usize; + let record_delimiter = { + if file_format_options.record_delimiter.is_empty() { + format.default_record_delimiter() + } else { + RecordDelimiter::try_from(file_format_options.record_delimiter.as_str())? + } + }; + + let rows_to_skip = file_format_options.skip_header as usize; + let field_delimiter = { + if file_format_options.field_delimiter.is_empty() { + format.default_field_delimiter() + } else { + file_format_options.field_delimiter.as_bytes()[0] + } + }; + Ok(InputContext { + format, + schema, + operator, + splits, + settings, + format_settings, + record_delimiter, + rows_per_block, + read_batch_size, + plan: InputPlan::CopyInto(plan), + rows_to_skip, + field_delimiter, + scan_progress, + }) + } + + #[allow(unused)] + async fn try_create_from_insert( + format_name: &str, + operator: Operator, + settings: Arc, + format_settings: FormatSettings, + schema: DataSchemaRef, + scan_progress: Arc, + ) -> Result { + let format = + StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?; + let format = Self::get_input_format(&format)?; + let read_batch_size = 1024 * 1024; + let rows_per_block = settings.get_max_block_size()? as usize; + let field_delimiter = settings.get_field_delimiter()?; + let field_delimiter = { + if field_delimiter.is_empty() { + format.default_field_delimiter() + } else { + field_delimiter.as_bytes()[0] + } + }; + let record_delimiter = RecordDelimiter::try_from(&settings.get_record_delimiter()?[..])?; + let rows_to_skip = settings.get_skip_header()? as usize; + Ok(InputContext { + format, + schema, + operator, + settings, + format_settings, + record_delimiter, + rows_per_block, + read_batch_size, + field_delimiter, + rows_to_skip, + scan_progress, + plan: InputPlan::StreamingLoad, + splits: Default::default(), + }) + } + + async fn get_file_infos( + format: &Arc, + op: &Operator, + plan: &CopyIntoPlan, + ) -> Result> { + let mut infos = vec![]; + for p in &plan.files { + let obj = op.object(p); + let size = obj.metadata().await?.content_length() as usize; + let file_meta = format.read_file_meta(&obj, size).await?; + let compress_alg = InputContext::get_compression_alg_copy( + plan.stage_info.file_format_options.compression, + p, + )?; + let info = FileInfo { + path: p.clone(), + size, + compress_alg, + file_meta, + }; + infos.push(info) + } + Ok(infos) + } + + pub fn num_prefetch_splits(&self) -> Result { + Ok(self.settings.get_max_threads()? as usize) + } + + pub fn num_prefetch_per_split(&self) -> usize { + 1 + } + + pub fn get_compression_alg(&self, path: &str) -> Result> { + let opt = match &self.plan { + InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression, + _ => StageFileCompression::None, + }; + Self::get_compression_alg_copy(opt, path) + } + + pub fn get_compression_alg_copy( + compress_option: StageFileCompression, + path: &str, + ) -> Result> { + let compression_algo = match compress_option { + StageFileCompression::Auto => CompressAlgorithm::from_path(path), + StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip), + StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2), + StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli), + StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd), + StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib), + StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate), + StageFileCompression::Xz => Some(CompressAlgorithm::Xz), + StageFileCompression::Lzo => { + return Err(ErrorCode::UnImplement("compress type lzo is unimplemented")); + } + StageFileCompression::Snappy => { + return Err(ErrorCode::UnImplement( + "compress type snappy is unimplemented", + )); + } + StageFileCompression::None => None, + }; + Ok(compression_algo) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs new file mode 100644 index 0000000000000..8dda2c8f1e7fb --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -0,0 +1,148 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cmp::min; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::io_util::CompressAlgorithm; +use opendal::Object; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputData: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} + +pub trait InputState: Send { + fn as_any(&mut self) -> &mut dyn Any; +} + +#[async_trait::async_trait] +pub trait InputFormat: Send + Sync { + fn default_record_delimiter(&self) -> RecordDelimiter; + + fn default_field_delimiter(&self) -> u8; + + async fn read_file_meta(&self, obj: &Object, size: usize) + -> Result>>; + + async fn read_split_meta( + &self, + obj: &Object, + split_info: &SplitInfo, + ) -> Result>>; + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec; + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()>; +} + +#[derive(Clone)] +pub struct FileInfo { + pub path: String, + pub size: usize, + pub compress_alg: Option, + pub file_meta: Option>, +} + +impl FileInfo { + pub fn split_by_size(&self, split_size: usize) -> Vec { + let mut splits = vec![]; + let n = (self.size + split_size - 1) / split_size; + for i in 0..n - 1 { + splits.push(SplitInfo { + file_info: self.clone(), + seq_infile: i, + is_end: i == n - 1, + offset: i * split_size, + len: min((i + 1) * split_size, self.size), + split_meta: None, + }) + } + splits + } +} + +impl Debug for FileInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileInfo") + .field("path", &self.path) + .field("size", &self.size) + .finish() + } +} + +#[derive(Clone)] +pub struct SplitInfo { + pub file_info: FileInfo, + pub seq_infile: usize, + pub is_end: bool, + pub offset: usize, + pub len: usize, + pub split_meta: Option>, +} + +impl SplitInfo { + pub fn from_file_info(file_info: FileInfo) -> Self { + let len = file_info.size; + Self { + file_info, + seq_infile: 0, + is_end: true, + offset: 0, + len, + split_meta: None, + } + } + + pub fn from_stream_split(path: String) -> Self { + SplitInfo { + file_info: FileInfo { + path, + size: 0, + compress_alg: None, + file_meta: None, + }, + seq_infile: 0, + offset: 0, + len: 0, + is_end: false, + split_meta: None, + } + } +} + +impl Debug for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SplitInfo") + .field("file_info", &self.file_info) + .field("seq_infile", &self.seq_infile) + .finish() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs new file mode 100644 index 0000000000000..9376f1ced9882 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -0,0 +1,372 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_meta_types::StageFileFormatType; +use common_pipeline_core::Pipeline; +use opendal::io_util::DecompressDecoder; +use opendal::io_util::DecompressState; +use opendal::Object; + +use super::InputFormat; +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_csv::CsvReaderState; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputFormatTextBase: Sized + Send + Sync + 'static { + fn format_type() -> StageFileFormatType; + + fn is_splittable() -> bool { + false + } + + fn default_record_delimiter() -> RecordDelimiter { + RecordDelimiter::Crlf + } + + fn default_field_delimiter() -> u8; + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()>; + + fn align(state: &mut AligningState, buf: &[u8]) -> Result>; +} + +pub struct InputFormatText { + phantom: PhantomData, +} + +impl InputFormatText { + pub fn create() -> Self { + Self { + phantom: Default::default(), + } + } +} + +pub struct InputFormatTextPipe { + phantom: PhantomData, +} + +#[async_trait::async_trait] +impl InputFormatPipe for InputFormatTextPipe { + type ReadBatch = Vec; + type RowBatch = RowBatch; + type AligningState = AligningState; + type BlockBuilder = BlockBuilder; +} + +#[async_trait::async_trait] +impl InputFormat for InputFormatText { + fn default_record_delimiter(&self) -> RecordDelimiter { + T::default_record_delimiter() + } + + fn default_field_delimiter(&self) -> u8 { + T::default_field_delimiter() + } + + async fn read_file_meta( + &self, + _obj: &Object, + _size: usize, + ) -> Result>> { + Ok(None) + } + + async fn read_split_meta( + &self, + _obj: &Object, + _split_info: &SplitInfo, + ) -> Result>> { + Ok(None) + } + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { + let mut splits = vec![]; + for f in file_infos { + if f.compress_alg.is_none() || !T::is_splittable() { + splits.push(SplitInfo::from_file_info(f)) + } else { + splits.append(&mut f.split_by_size(split_size)) + } + } + splits + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + tracing::info!("exe text"); + InputFormatTextPipe::::execute_copy_with_aligner(ctx, pipeline) + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + InputFormatTextPipe::::execute_stream(ctx, pipeline, input) + } +} + +#[derive(Default)] +pub struct RowBatch { + pub data: Vec, + pub row_ends: Vec, + pub field_ends: Vec, + + // for error info + pub path: String, + pub offset: usize, + pub start_row: Option, +} + +pub struct AligningState { + pub path: String, + pub record_delimiter_end: u8, + pub field_delimiter: u8, + pub rows: usize, + pub offset: usize, + pub rows_to_skip: usize, + pub tail_of_last_batch: Vec, + pub num_fields: usize, + pub decoder: Option, + pub csv_reader: Option, + phantom: PhantomData, +} + +impl AligningState { + pub fn align_by_record_delimiter(&mut self, buf_in: &[u8]) -> Vec { + let record_delimiter_end = self.record_delimiter_end; + let mut buf = buf_in; + if self.rows_to_skip > 0 { + let mut i = 0; + for b in buf.iter() { + if *b == record_delimiter_end { + self.rows_to_skip -= 1; + if self.rows_to_skip == 0 { + break; + } + } + i += 1; + } + if self.rows_to_skip > 0 { + self.tail_of_last_batch = vec![]; + return vec![]; + } else { + buf = &buf[i + 1..]; + } + } + if buf.is_empty() { + return vec![]; + } + + let mut output = RowBatch::default(); + let rows = &mut output.row_ends; + for (i, b) in buf.iter().enumerate() { + if *b == b'\n' { + rows.push(i) + } + } + let last = rows[rows.len() - 1]; + if rows.is_empty() { + self.tail_of_last_batch.extend_from_slice(buf); + vec![] + } else { + output.data = mem::take(&mut self.tail_of_last_batch); + output.data.extend_from_slice(&buf[0..last + 1]); + let size = output.data.len(); + output.path = self.path.to_string(); + self.offset += size; + tracing::debug!( + "align {} bytes to {} rows: {} .. {}", + size, + rows.len(), + rows[0], + last + ); + vec![output] + } + } + + fn flush(&mut self) -> Vec { + if self.tail_of_last_batch.is_empty() { + vec![] + } else { + // last row + let data = mem::take(&mut self.tail_of_last_batch); + let end = data.len(); + let row_batch = RowBatch { + data, + row_ends: vec![end], + field_ends: vec![], + path: self.path.to_string(), + offset: self.offset, + start_row: None, + }; + vec![row_batch] + } + } +} + +impl AligningStateTrait for AligningState { + type Pipe = InputFormatTextPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + let rows_to_skip = if split_info.seq_infile == 0 { + ctx.rows_to_skip + } else { + 0 + }; + let path = split_info.file_info.path.clone(); + + let decoder = ctx.get_compression_alg(&path)?.map(DecompressDecoder::new); + let csv_reader = if T::format_type() == StageFileFormatType::Csv { + Some(CsvReaderState::create(ctx)) + } else { + None + }; + + Ok(AligningState:: { + path, + decoder, + rows_to_skip, + csv_reader, + tail_of_last_batch: vec![], + rows: 0, + num_fields: ctx.schema.num_fields(), + offset: split_info.offset, + record_delimiter_end: ctx.record_delimiter.end(), + field_delimiter: ctx.field_delimiter, + phantom: Default::default(), + }) + } + + fn align(&mut self, read_batch: Option>) -> Result> { + let row_batches = if let Some(data) = read_batch { + let buf = if let Some(decoder) = self.decoder.as_mut() { + decompress(decoder, &data)? + } else { + data + }; + T::align(self, &buf)? + } else { + if let Some(decoder) = &self.decoder { + assert_eq!(decoder.state(), DecompressState::Done) + } + self.flush() + }; + Ok(row_batches) + } +} + +pub struct BlockBuilder { + pub ctx: Arc, + pub mutable_columns: Vec, + pub num_rows: usize, + phantom: PhantomData, +} + +impl BlockBuilder { + fn flush(&mut self) -> Result> { + let mut columns = Vec::with_capacity(self.mutable_columns.len()); + for deserializer in &mut self.mutable_columns { + columns.push(deserializer.finish_to_column()); + } + self.mutable_columns = self + .ctx + .schema + .create_deserializers(self.ctx.rows_per_block); + self.num_rows = 0; + + Ok(vec![DataBlock::create(self.ctx.schema.clone(), columns)]) + } +} + +impl BlockBuilderTrait for BlockBuilder { + type Pipe = InputFormatTextPipe; + + fn create(ctx: Arc) -> Self { + let columns = ctx.schema.create_deserializers(ctx.rows_per_block); + BlockBuilder { + ctx, + mutable_columns: columns, + num_rows: 0, + phantom: Default::default(), + } + } + + fn deserialize(&mut self, batch: Option) -> Result> { + if let Some(b) = batch { + self.num_rows += b.row_ends.len(); + T::deserialize(self, b)?; + if self.num_rows >= self.ctx.rows_per_block { + self.flush() + } else { + Ok(vec![]) + } + } else { + self.flush() + } + } +} + +fn decompress(decoder: &mut DecompressDecoder, compressed: &[u8]) -> Result> { + let mut decompress_bufs = vec![]; + let mut amt = 0; + loop { + match decoder.state() { + DecompressState::Reading => { + if amt == compressed.len() { + break; + } + let read = decoder.fill(&compressed[amt..]); + amt += read; + } + DecompressState::Decoding => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.decode(&mut decompress_buf[..]).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Flushing => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.finish(&mut decompress_buf).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Done => break, + } + } + Ok(decompress_bufs.concat()) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs new file mode 100644 index 0000000000000..d878d029296b1 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -0,0 +1,273 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use common_base::base::tokio; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_base::base::tokio::sync::mpsc::Sender; +use common_base::base::GlobalIORuntime; +use common_base::base::TrySpawn; +use common_datablocks::DataBlock; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::Pipeline; +use common_pipeline_core::SourcePipeBuilder; +use futures_util::stream::FuturesUnordered; +use futures_util::AsyncReadExt; +use futures_util::StreamExt; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_context::InputPlan; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::source_aligner::Aligner; +use crate::processors::sources::input_formats::source_deserializer::DeserializeSource; +use crate::processors::sources::input_formats::transform_deserializer::DeserializeTransformer; + +pub struct Split { + pub(crate) info: SplitInfo, + pub(crate) rx: Receiver>, +} + +#[allow(unused)] +pub struct StreamingSplit { + path: String, + data_tx: Sender, +} + +pub struct StreamingReadBatch { + data: Vec, + pub(crate) path: String, + pub(crate) is_start: bool, +} + +pub trait AligningStateTrait: Sized { + type Pipe: InputFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result; + + fn align( + &mut self, + read_batch: Option<::ReadBatch>, + ) -> Result::RowBatch>>; +} + +pub trait BlockBuilderTrait { + type Pipe: InputFormatPipe; + fn create(ctx: Arc) -> Self; + + fn deserialize( + &mut self, + batch: Option<::RowBatch>, + ) -> Result>; +} + +#[async_trait::async_trait] +pub trait InputFormatPipe: Sized + Send + 'static { + type ReadBatch: From> + Send + Debug; + type RowBatch: Send; + type AligningState: AligningStateTrait + Send; + type BlockBuilder: BlockBuilderTrait + Send; + + fn execute_stream( + ctx: Arc, + pipeline: &mut Pipeline, + mut input: Receiver, + ) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + tokio::spawn(async move { + let mut sender: Option>> = None; + while let Some(batch) = input.recv().await { + if batch.is_start { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(1); + sender = Some(data_tx); + let split_info = SplitInfo::from_stream_split(batch.path.clone()); + split_tx + .send(Split { + info: split_info, + rx: data_rx, + }) + .await + .expect("fail to send split from stream"); + } + if let Some(s) = sender.as_mut() { + s.send(Ok(batch.data.into())) + .await + .expect("fail to send read batch from stream"); + } + } + }); + Ok(()) + } + + fn execute_copy_with_aligner(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + GlobalIORuntime::instance().spawn(async move { + tracing::debug!("start copy splits feeder"); + for s in &ctx_clone.splits { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(ctx.num_prefetch_per_split()); + let split_clone = s.clone(); + let ctx_clone2 = ctx_clone.clone(); + tokio::spawn(async move { + if let Err(e) = + Self::copy_reader_with_aligner(ctx_clone2, split_clone, data_tx).await + { + tracing::error!("copy split reader error: {:?}", e); + } else { + tracing::debug!("copy split reader stopped"); + } + }); + if split_tx + .send(Split { + info: s.clone(), + rx: data_rx, + }) + .await + .is_err() + { + break; + }; + } + tracing::info!("end copy splits feeder"); + }); + + Ok(()) + } + + fn execute_copy_aligned(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (data_tx, data_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_aligned(&ctx, data_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + let p = 3; + tokio::spawn(async move { + let mut futs = FuturesUnordered::new(); + for s in &ctx_clone.splits { + let fut = Self::read_split(ctx_clone.clone(), s.clone()); + futs.push(fut); + if futs.len() >= p { + let row_batch = futs.next().await.unwrap().unwrap(); + data_tx.send(row_batch).await.unwrap(); + } + } + + while let Some(row_batch) = futs.next().await { + data_tx.send(row_batch.unwrap()).await.unwrap(); + } + }); + Ok(()) + } + + fn build_pipeline_aligned( + ctx: &Arc, + row_batch_rx: async_channel::Receiver, + pipeline: &mut Pipeline, + ) -> Result<()> { + let mut builder = SourcePipeBuilder::create(); + for _ in 0..ctx.settings.get_max_threads()? { + let output = OutputPort::create(); + let source = DeserializeSource::::create( + ctx.clone(), + output.clone(), + row_batch_rx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + Ok(()) + } + + fn build_pipeline_with_aligner( + ctx: &Arc, + split_rx: async_channel::Receiver>, + pipeline: &mut Pipeline, + ) -> Result<()> { + let mut builder = SourcePipeBuilder::create(); + let n_threads = ctx.settings.get_max_threads()? as usize; + let max_aligner = match ctx.plan { + InputPlan::CopyInto(_) => ctx.splits.len(), + InputPlan::StreamingLoad => 3, + }; + let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads); + for _ in 0..std::cmp::min(max_aligner, n_threads) { + let output = OutputPort::create(); + let source = Aligner::::try_create( + output.clone(), + ctx.clone(), + split_rx.clone(), + row_batch_tx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + pipeline.resize(n_threads)?; + pipeline.add_transform(|input, output| { + DeserializeTransformer::::create(ctx.clone(), input, output, row_batch_rx.clone()) + })?; + Ok(()) + } + + async fn read_split(_ctx: Arc, _split_info: SplitInfo) -> Result { + unimplemented!() + } + + #[tracing::instrument(level = "debug", skip(ctx, batch_tx))] + async fn copy_reader_with_aligner( + ctx: Arc, + split_info: SplitInfo, + batch_tx: Sender>, + ) -> Result<()> { + tracing::debug!("start"); + let object = ctx.operator.object(&split_info.file_info.path); + let offset = split_info.offset as u64; + let mut reader = object.range_reader(offset..).await?; + loop { + let mut batch = vec![0u8; ctx.read_batch_size]; + let n = read_full(&mut reader, &mut batch[0..]).await?; + if n == 0 { + break; + } else { + batch.truncate(n); + tracing::debug!("read {} bytes", n); + batch_tx + .send(Ok(batch.into())) + .await + .map_err(|_| ErrorCode::UnexpectedError("fail to send ReadBatch"))?; + } + } + tracing::debug!("finished"); + Ok(()) + } +} + +pub async fn read_full(reader: &mut R, buf: &mut [u8]) -> Result { + let mut buf = &mut buf[0..]; + let mut n = 0; + while !buf.is_empty() { + let read = reader.read(buf).await?; + if read == 0 { + break; + } + n += read; + buf = &mut buf[read..] + } + Ok(n) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs new file mode 100644 index 0000000000000..db999464d2f5b --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod delimiter; +mod impls; +mod input_context; +mod input_format; +mod input_format_text; +mod input_pipeline; +mod source_aligner; +mod source_deserializer; +mod transform_deserializer; + +pub use input_context::InputContext; +pub use input_format::InputFormat; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs new file mode 100644 index 0000000000000..eb2f0c697f0dc --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs @@ -0,0 +1,172 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use crossbeam_channel::TrySendError; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::Split; + +pub struct Aligner { + ctx: Arc, + output: Arc, + + // input + split_rx: async_channel::Receiver>, + + state: Option, + batch_rx: Option>>, + read_batch: Option, + + received_end_batch_of_split: bool, + no_more_split: bool, + + // output + row_batches: VecDeque, + row_batch_tx: crossbeam_channel::Sender, +} + +impl Aligner { + pub(crate) fn try_create( + output: Arc, + ctx: Arc, + split_rx: async_channel::Receiver>, + batch_tx: crossbeam_channel::Sender, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + ctx, + output, + split_rx, + row_batch_tx: batch_tx, + state: None, + read_batch: None, + batch_rx: None, + received_end_batch_of_split: false, + no_more_split: false, + row_batches: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for Aligner { + fn name(&self) -> &'static str { + "Aligner" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.no_more_split && self.row_batches.is_empty() && self.read_batch.is_none() { + self.output.finish(); + Ok(Event::Finished) + } else if let Some(rb) = self.row_batches.pop_front() { + match self.row_batch_tx.try_send(rb) { + Ok(()) => { + tracing::debug!("aligner send row batch ok"); + self.output.push_data(Err(ErrorCode::Ok(""))); + Ok(Event::NeedConsume) + } + Err(TrySendError::Full(b)) => { + tracing::debug!("aligner send row batch full"); + self.row_batches.push_front(b); + Ok(Event::NeedConsume) + } + Err(TrySendError::Disconnected(_)) => { + tracing::debug!("aligner send row batch disconnected"); + self.output.finish(); + Ok(Event::Finished) + } + } + } else if self.read_batch.is_some() || self.received_end_batch_of_split { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + + fn process(&mut self) -> Result<()> { + match &mut self.state { + Some(state) => { + let read_batch = mem::take(&mut self.read_batch); + let eof = read_batch.is_none(); + let row_batches = state.align(read_batch)?; + for b in row_batches.into_iter() { + self.row_batches.push_back(b); + } + if eof { + self.state = None; + self.batch_rx = None; + } + self.received_end_batch_of_split = false; + Ok(()) + } + _ => Err(ErrorCode::UnexpectedError("Aligner process state is none")), + } + } + + async fn async_process(&mut self) -> Result<()> { + if !self.no_more_split { + if self.state.is_none() { + match self.split_rx.recv().await { + Ok(split) => { + self.state = Some(I::AligningState::try_create(&self.ctx, &split.info)?); + self.batch_rx = Some(split.rx); + self.received_end_batch_of_split = false; + tracing::debug!( + "aligner recv new split {} {}", + &split.info.file_info.path, + split.info.seq_infile + ); + } + Err(_) => { + tracing::debug!("aligner no more split"); + self.no_more_split = true; + } + } + } + if let Some(rx) = self.batch_rx.as_mut() { + match rx.recv().await { + Some(Ok(batch)) => { + tracing::debug!("aligner recv new batch"); + self.read_batch = Some(batch) + } + Some(Err(e)) => { + return Err(e); + } + None => { + tracing::debug!("aligner recv end of current split"); + self.received_end_batch_of_split = true; + } + } + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs new file mode 100644 index 0000000000000..dadd99dd7cdb2 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; + +pub struct DeserializeSource { + #[allow(unused)] + output: Arc, + + block_builder: I::BlockBuilder, + input_rx: async_channel::Receiver, + input_buffer: Option, + input_finished: bool, + output_buffer: VecDeque, +} + +impl DeserializeSource { + #[allow(unused)] + pub(crate) fn create( + ctx: Arc, + output: Arc, + rx: async_channel::Receiver, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + block_builder: I::BlockBuilder::create(ctx), + output, + input_rx: rx, + input_buffer: Default::default(), + input_finished: false, + output_buffer: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for DeserializeSource { + fn name(&self) -> &'static str { + "Deserializer" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input_buffer = None; + self.input_finished = true; + Ok(Event::Finished) + } else if !self.output.can_push() { + Ok(Event::NeedConsume) + } else { + match self.output_buffer.pop_front() { + Some(data_block) => { + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + None => { + if self.input_buffer.is_some() { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + } + } + } + + fn process(&mut self) -> Result<()> { + if self.input_finished { + assert!(self.input_buffer.is_none()); + } + let blocks = self.block_builder.deserialize(self.input_buffer.take())?; + for b in blocks.into_iter() { + self.output_buffer.push_back(b) + } + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + assert!(self.input_buffer.is_none() && !self.input_finished); + match self.input_rx.recv().await { + Ok(row_batch) => { + self.input_buffer = Some(row_batch); + } + Err(_) => { + self.input_finished = true; + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs new file mode 100644 index 0000000000000..c8ecf6be080f0 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/transform_deserializer.rs @@ -0,0 +1,150 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; +use crossbeam_channel::TryRecvError; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; + +struct DeserializeProcessor { + pub block_builder: I::BlockBuilder, + pub input_buffer: Option, + pub output_buffer: VecDeque, +} + +impl DeserializeProcessor { + pub(crate) fn create(ctx: Arc) -> Result { + Ok(Self { + block_builder: I::BlockBuilder::create(ctx), + input_buffer: Default::default(), + output_buffer: Default::default(), + }) + } + + fn process(&mut self) -> Result<()> { + let blocks = self.block_builder.deserialize(self.input_buffer.take())?; + for b in blocks.into_iter() { + self.output_buffer.push_back(b) + } + Ok(()) + } +} + +pub struct DeserializeTransformer { + processor: DeserializeProcessor, + input: Arc, + output: Arc, + rx: crossbeam_channel::Receiver, + flushing: bool, +} + +impl DeserializeTransformer { + pub(crate) fn create( + ctx: Arc, + input: Arc, + output: Arc, + rx: crossbeam_channel::Receiver, + ) -> Result { + let processor = DeserializeProcessor::create(ctx)?; + Ok(ProcessorPtr::create(Box::new(Self { + processor, + input, + output, + rx, + flushing: false, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for DeserializeTransformer { + fn name(&self) -> &'static str { + "DeserializeTransformer" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + Ok(Event::Finished) + } else if !self.output.can_push() { + self.input.set_not_need_data(); + Ok(Event::NeedConsume) + } else { + match self.processor.output_buffer.pop_front() { + Some(data_block) => { + tracing::info!("DeserializeTransformer push rows {}", data_block.num_rows()); + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + None => { + if self.processor.input_buffer.is_some() { + Ok(Event::Sync) + } else { + if self.input.has_data() { + self.input.pull_data(); + match self.rx.try_recv() { + Ok(read_batch) => { + self.processor.input_buffer = Some(read_batch); + return Ok(Event::Sync); + } + Err(TryRecvError::Disconnected) => { + tracing::warn!("DeserializeTransformer rx disconnected"); + self.input.finish(); + self.flushing = true; + return Ok(Event::Finished); + } + Err(TryRecvError::Empty) => { + // do nothing + } + } + } + // !has_data() or try_recv return Empty + if self.input.is_finished() { + if self.flushing { + self.output.finish(); + Ok(Event::Finished) + } else { + self.flushing = true; + Ok(Event::Sync) + } + } else { + self.input.set_need_data(); + Ok(Event::NeedData) + } + } + } + } + } + } + + fn process(&mut self) -> Result<()> { + self.processor.process() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/mod.rs b/src/query/pipeline/sources/src/processors/sources/mod.rs index d8283bf320b34..f11656c1fce2c 100644 --- a/src/query/pipeline/sources/src/processors/sources/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/mod.rs @@ -17,6 +17,7 @@ pub mod blocks_source; pub mod deserializer; pub mod empty_source; pub mod file_splitter; +pub mod input_formats; pub mod multi_file_splitter; mod one_block_source; pub mod stream_source; diff --git a/src/query/service/src/storages/stage/stage_source.rs b/src/query/service/src/storages/stage/stage_source.rs index 5659e4dd8cb48..12ce68f922644 100644 --- a/src/query/service/src/storages/stage/stage_source.rs +++ b/src/query/service/src/storages/stage/stage_source.rs @@ -82,6 +82,7 @@ impl StageSourceHelper { } else { OperatorInfo::Cfg(stage_info.stage_params.storage.clone()) }; + let src = StageSourceHelper { ctx, operator_info, From e1513fa6c2f257ea4f643109cf4eca7c81e45efc Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 18 Sep 2022 09:54:49 +0800 Subject: [PATCH 02/22] feat(query): copy use new input format framework. --- .../src/interpreters/interpreter_copy_v2.rs | 1 + .../service/src/storages/stage/stage_table.rs | 53 ++++++++++--------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 811591719d421..63d67b95cfabe 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -269,6 +269,7 @@ impl CopyInterpreterV2 { tracing::info!("copy_files_to_table from source: {:?}", read_source_plan); let from_table = self.ctx.build_table_from_source_plan(&read_source_plan)?; + from_table.read_partitions(self.ctx.clone(), None).await?; from_table.read2( self.ctx.clone(), &read_source_plan, diff --git a/src/query/service/src/storages/stage/stage_table.rs b/src/query/service/src/storages/stage/stage_table.rs index 9ab72b8d067a2..f0ca2a3d6c28f 100644 --- a/src/query/service/src/storages/stage/stage_table.rs +++ b/src/query/service/src/storages/stage/stage_table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::VecDeque; use std::str::FromStr; use std::sync::Arc; @@ -29,15 +28,14 @@ use common_legacy_planners::Statistics; use common_meta_app::schema::TableInfo; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::SinkPipeBuilder; +use common_pipeline_sources::processors::sources::input_formats::InputContext; use parking_lot::Mutex; use tracing::info; use super::StageSourceHelper; -use crate::pipelines::processors::port::OutputPort; use crate::pipelines::processors::ContextSink; use crate::pipelines::processors::TransformLimit; use crate::pipelines::Pipeline; -use crate::pipelines::SourcePipeBuilder; use crate::sessions::TableContext; use crate::storages::Table; @@ -47,6 +45,7 @@ pub struct StageTable { // But the Table trait need it: // fn get_table_info(&self) -> &TableInfo). table_info_placeholder: TableInfo, + input_context: Mutex>>, } impl StageTable { @@ -56,8 +55,14 @@ impl StageTable { Ok(Arc::new(Self { table_info, table_info_placeholder, + input_context: Default::default(), })) } + + fn get_input_context(&self) -> Option> { + let guard = self.input_context.lock(); + guard.clone() + } } #[async_trait::async_trait] @@ -73,39 +78,35 @@ impl Table for StageTable { async fn read_partitions( &self, - _ctx: Arc, + ctx: Arc, _push_downs: Option, ) -> Result<(Statistics, Partitions)> { + let operator = StageSourceHelper::get_op(&ctx, &self.table_info.stage_info).await?; + let input_ctx = Arc::new( + InputContext::try_create_from_copy( + operator, + ctx.get_settings().clone(), + ctx.get_format_settings()?, + self.table_info.schema.clone(), + self.table_info.stage_info.clone(), + self.table_info.files.clone(), + ctx.get_scan_progress() + ) + .await?, + ); + let mut guard = self.input_context.lock(); + *guard = Some(input_ctx); Ok((Statistics::default(), vec![])) } fn read2( &self, - ctx: Arc, + _ctx: Arc, _plan: &ReadDataSourcePlan, pipeline: &mut Pipeline, ) -> Result<()> { - let settings = ctx.get_settings(); - let mut builder = SourcePipeBuilder::create(); - let table_info = &self.table_info; - let schema = table_info.schema.clone(); - let mut files_deque = VecDeque::with_capacity(table_info.files.len()); - for f in &table_info.files { - files_deque.push_back(f.to_string()); - } - let files = Arc::new(Mutex::new(files_deque)); - - let stage_source = StageSourceHelper::try_create(ctx, schema, table_info.clone(), files)?; - - for _index in 0..settings.get_max_threads()? { - let output = OutputPort::create(); - builder.add_source(output.clone(), stage_source.get_splitter(output)?); - } - pipeline.add_pipe(builder.finalize()); - - pipeline.add_transform(|transform_input_port, transform_output_port| { - stage_source.get_deserializer(transform_input_port, transform_output_port) - })?; + let input_ctx = self.get_input_context().unwrap(); + input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; let limit = self.table_info.stage_info.copy_options.size_limit; if limit > 0 { From 85ba9d55f383c5c5a24aa8e671e2d3099e861001 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 18 Sep 2022 09:55:48 +0800 Subject: [PATCH 03/22] feat(query): export format_diagnostic::verbose_string. --- src/query/formats/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/formats/src/lib.rs b/src/query/formats/src/lib.rs index c5157e1aad9c9..f61f714d26c9f 100644 --- a/src/query/formats/src/lib.rs +++ b/src/query/formats/src/lib.rs @@ -27,4 +27,5 @@ mod output_format_values; pub use format::InputFormat; pub use format::InputState; +pub use format_diagnostic::verbose_string; pub use format_factory::FormatFactory; From f05d698b29183777151352aee13aea2941bf3912 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 10:31:32 +0800 Subject: [PATCH 04/22] input format parquet. --- .../impls/input_format_parquet.rs | 212 ++++++++++++++++-- 1 file changed, 194 insertions(+), 18 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index fa3669cda8d92..5db6313a59c9c 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -12,17 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::io::Cursor; +use std::io::Read; +use std::io::Seek; +use std::mem; use std::sync::Arc; +use common_arrow::arrow::array::Array; +use common_arrow::arrow::chunk::Chunk; +use common_arrow::arrow::datatypes::Field; +use common_arrow::arrow::io::parquet::read; +use common_arrow::arrow::io::parquet::read::read_columns; +use common_arrow::arrow::io::parquet::read::to_deserializer; +use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; +use common_arrow::parquet::metadata::ColumnChunkMetaData; +use common_arrow::parquet::metadata::FileMetaData; use common_arrow::parquet::metadata::RowGroupMetaData; +use common_arrow::parquet::read::read_metadata; use common_base::base::tokio::sync::mpsc::Receiver; use common_datablocks::DataBlock; +use common_datavalues::remove_nullable; +use common_datavalues::DataField; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::Pipeline; use opendal::Object; +use similar_asserts::traits::MakeDiff; +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_format::FileInfo; use crate::processors::sources::input_formats::input_format::InputData; @@ -33,32 +54,45 @@ use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; use crate::processors::sources::input_formats::InputFormat; -struct InputFormatParquet; +pub struct InputFormatParquet; #[async_trait::async_trait] impl InputFormat for InputFormatParquet { + fn default_record_delimiter(&self) -> RecordDelimiter { + RecordDelimiter::Crlf + } + + fn default_field_delimiter(&self) -> u8 { + b'_' + } + async fn read_file_meta( &self, - obj: &Object, - size: usize, + _obj: &Object, + _size: usize, ) -> Result>> { - todo!() + // todo(youngsofun): execute_copy_aligned + Ok(None) } async fn read_split_meta( &self, - obj: &Object, - split_info: &SplitInfo, + _obj: &Object, + _split_info: &SplitInfo, ) -> Result>> { - todo!() + Ok(None) } - fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { - todo!() + fn split_files(&self, file_infos: Vec, _split_size: usize) -> Vec { + file_infos + .into_iter() + .map(SplitInfo::from_file_info) + .collect() } fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { - todo!() + // todo(youngsofun): execute_copy_aligned + ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline) } fn exec_stream( @@ -67,7 +101,7 @@ impl InputFormat for InputFormatParquet { pipeline: &mut Pipeline, input: Receiver, ) -> Result<()> { - todo!() + ParquetFormatPipe::execute_stream(ctx, pipeline, input) } } @@ -81,11 +115,58 @@ impl InputFormatPipe for ParquetFormatPipe { type BlockBuilder = ParquetBlockBuilder; } -pub struct SplitMeta { - row_groups: Vec, +pub struct RowGroupInMemory { + pub meta: RowGroupMetaData, + pub fields: Arc>, + pub field_meta_indexes: Vec>, + pub field_arrays: Vec>>, } -pub struct RowGroupInMemory {} +impl RowGroupInMemory { + fn read( + reader: &mut R, + meta: RowGroupMetaData, + fields: Arc>, + ) -> Result { + let field_names = fields.iter().map(|x| x.name.as_str()).collect::>(); + let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names); + let mut filed_arrays = vec![]; + for field_name in field_names { + let meta_data = read_columns(reader, meta.columns(), field_name)?; + let data = meta_data.into_iter().map(|t| t.1).collect::>(); + filed_arrays.push(data) + } + Ok(Self { + meta, + field_meta_indexes, + field_arrays: filed_arrays, + fields, + }) + } + + fn get_arrow_chunk(&mut self) -> Result>> { + let mut column_chunks = vec![]; + let field_arrays = mem::take(&mut self.field_arrays); + for (f, datas) in field_arrays.into_iter().enumerate() { + let meta_iters = self.field_meta_indexes[f] + .iter() + .map(|c| &self.meta.columns()[*c]); + let meta_data = meta_iters.zip(datas.into_iter()).collect::>(); + let array_iters = to_deserializer( + meta_data, + self.fields[f].clone(), + self.meta.num_rows() as usize, + None, + )?; + column_chunks.push(array_iters); + } + match RowGroupDeserializer::new(column_chunks, self.meta.num_rows(), None).next() { + None => Err(ErrorCode::ParquetError("fail to get a chunk")), + Some(Ok(chunk)) => Ok(chunk), + Some(Err(e)) => Err(ErrorCode::ParquetError(e.to_string())), + } + } +} impl Debug for RowGroupInMemory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -96,6 +177,7 @@ impl Debug for RowGroupInMemory { #[derive(Debug)] pub enum ReadBatch { Buffer(Vec), + #[allow(unused)] RowGroup(RowGroupInMemory), } @@ -116,12 +198,20 @@ impl BlockBuilderTrait for ParquetBlockBuilder { ParquetBlockBuilder { ctx } } - fn deserialize(&mut self, batch: Option) -> Result> { - todo!() + fn deserialize(&mut self, mut batch: Option) -> Result> { + if let Some(rg) = batch.as_mut() { + let chunk = rg.get_arrow_chunk()?; + let block = DataBlock::from_chunk(&self.ctx.schema, &chunk)?; + Ok(vec![block]) + } else { + Ok(vec![]) + } } } pub struct AligningState { + ctx: Arc, + split_info: SplitInfo, buffers: Vec>, } @@ -129,10 +219,96 @@ impl AligningStateTrait for AligningState { type Pipe = ParquetFormatPipe; fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { - todo!() + Ok(AligningState { + ctx: ctx.clone(), + split_info: split_info.clone(), + buffers: vec![], + }) } fn align(&mut self, read_batch: Option) -> Result> { - todo!() + if let Some(rb) = read_batch { + if let ReadBatch::Buffer(b) = rb { + self.buffers.push(b) + }; + Ok(vec![]) + } else { + let file_in_memory = self.buffers.concat(); + let size = file_in_memory.len(); + tracing::debug!( + "aligning parquet file {} of {} bytes", + self.split_info.file_info.path, + size, + ); + let mut cursor = Cursor::new(file_in_memory); + let file_meta = + read_metadata(&mut cursor).map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let read_fields = Arc::new(get_fields(&file_meta, &self.ctx.schema)?); + + let mut row_batches = Vec::with_capacity(file_meta.row_groups.len()); + for row_group in file_meta.row_groups.into_iter() { + row_batches.push(RowGroupInMemory::read( + &mut cursor, + row_group, + read_fields.clone(), + )?) + } + tracing::info!( + "align parquet file {} of {} bytes to {} row groups", + self.split_info.file_info.path, + size, + row_batches.len() + ); + Ok(row_batches) + } + } +} + +fn get_fields(file_meta: &FileMetaData, schema: &DataSchemaRef) -> Result> { + let infer_schema = read::infer_schema(file_meta)?; + let mut read_fields = Vec::with_capacity(schema.num_fields()); + for f in schema.fields().iter() { + if let Some(m) = infer_schema + .fields + .iter() + .filter(|c| c.name.eq_ignore_ascii_case(f.name())) + .last() + { + let tf = DataField::from(m); + if remove_nullable(tf.data_type()) != remove_nullable(f.data_type()) { + let pair = (f, m); + let diff = pair.make_diff("expected_field", "infer_field"); + return Err(ErrorCode::ParquetError(format!( + "parquet schema mismatch, differ: {}", + diff + ))); + } + + read_fields.push(m.clone()); + } else { + return Err(ErrorCode::ParquetError(format!( + "schema field size mismatch, expected to find column: {}", + f.name() + ))); + } } + Ok(read_fields) +} + +pub fn split_column_metas_by_field( + columns: &[ColumnChunkMetaData], + field_names: &[&str], +) -> Vec> { + let mut r = field_names.iter().map(|_| vec![]).collect::>(); + let d = field_names + .iter() + .enumerate() + .map(|(i, name)| (name, i)) + .collect::>(); + columns.iter().enumerate().for_each(|(col_i, x)| { + if let Some(field_i) = d.get(&x.descriptor().path_in_schema[0].as_str()) { + r[*field_i].push(col_i); + } + }); + r } From 42af88054085199712ba254f6c7685d2a2a41ebd Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 11:43:10 +0800 Subject: [PATCH 05/22] input format ndjson --- .../impls/input_format_ndjson.rs | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs new file mode 100644 index 0000000000000..1c11d1ed4bfae --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs @@ -0,0 +1,129 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; + +use bstr::ByteSlice; +use common_datavalues::DataSchemaRef; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use common_meta_types::StageFileFormatType; + +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; + +pub struct InputFormatNDJson {} + +impl InputFormatNDJson { + fn read_row( + buf: &[u8], + deserializers: &mut [TypeDeserializerImpl], + format_settings: &FormatSettings, + schema: &DataSchemaRef, + ) -> Result<()> { + let mut json: serde_json::Value = serde_json::from_reader(buf)?; + // if it's not case_sensitive, we convert to lowercase + if !format_settings.ident_case_sensitive { + if let serde_json::Value::Object(x) = json { + let y = x.into_iter().map(|(k, v)| (k.to_lowercase(), v)).collect(); + json = serde_json::Value::Object(y); + } + } + + for (f, deser) in schema.fields().iter().zip(deserializers.iter_mut()) { + let value = if format_settings.ident_case_sensitive { + &json[f.name().to_owned()] + } else { + &json[f.name().to_lowercase()] + }; + + deser.de_json(value, format_settings).map_err(|e| { + let value_str = format!("{:?}", value); + ErrorCode::BadBytes(format!( + "{}. column={} value={}", + e, + f.name(), + maybe_truncated(&value_str, 1024), + )) + })?; + } + Ok(()) + } +} + +impl InputFormatTextBase for InputFormatNDJson { + fn format_type() -> StageFileFormatType { + StageFileFormatType::NdJson + } + + fn default_field_delimiter() -> u8 { + b',' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let mut start = 0usize; + let start_row = batch.start_row; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + let buf = buf.trim(); + if !buf.is_empty() { + if let Err(e) = Self::read_row( + buf, + columns, + &builder.ctx.format_settings, + &builder.ctx.schema, + ) { + let row_info = if let Some(r) = start_row { + format!("row={},", r + i) + } else { + String::new() + }; + let msg = format!( + "fail to parse NDJSON: {}, path={}, offset={}, {}", + &batch.path, + e, + batch.offset + start, + row_info, + ); + return Err(ErrorCode::BadBytes(msg)); + } + } + start = *end + 1; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf: &[u8]) -> Result> { + Ok(state.align_by_record_delimiter(buf)) + } +} + +fn maybe_truncated(s: &str, limit: usize) -> Cow<'_, str> { + if s.len() > limit { + Cow::Owned(format!( + "(first {}B of {}B): {}", + limit, + s.len(), + &s[..limit] + )) + } else { + Cow::Borrowed(s) + } +} From a8b11b5a5d596b14555efbf6d767ef2dfc13b36c Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 11:43:33 +0800 Subject: [PATCH 06/22] input format csv --- .../input_formats/impls/input_format_csv.rs | 296 ++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs new file mode 100644 index 0000000000000..207524de98c62 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -0,0 +1,296 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use common_datavalues::TypeDeserializer; +use common_exception::ErrorCode; +use common_exception::Result; +use common_formats::verbose_string; +use common_io::prelude::FormatSettings; +use common_io::prelude::NestedCheckpointReader; +use common_meta_types::StageFileFormatType; +use csv_core::ReadRecordResult; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; +use crate::processors::sources::input_formats::InputContext; + +pub struct InputFormatCSV {} + +impl InputFormatCSV { + fn read_row( + buf: &[u8], + deserializers: &mut [common_datavalues::TypeDeserializerImpl], + field_ends: &[usize], + format_settings: &FormatSettings, + path: &str, + row_index: usize, + ) -> Result<()> { + let mut field_start = 0; + for (c, deserializer) in deserializers.iter_mut().enumerate() { + let field_end = field_ends[c]; + let col_data = &buf[field_start..field_end]; + if col_data.is_empty() { + deserializer.de_default(format_settings); + } else { + let mut reader = NestedCheckpointReader::new(col_data); + // reader.ignores(|c: u8| c == b' ').expect("must success"); + // todo(youngsofun): do not need escape, already done in csv-core + if let Err(e) = deserializer.de_text(&mut reader, format_settings) { + let mut value = String::new(); + verbose_string(buf, &mut value); + let err_msg = format!( + "fail to decode column {}: {:?}, [column_data]=[{}]", + c, e, value + ); + return Err(csv_error(&err_msg, path, row_index)); + }; + } + field_start = field_end; + } + Ok(()) + } +} + +impl InputFormatTextBase for InputFormatCSV { + fn format_type() -> StageFileFormatType { + StageFileFormatType::Csv + } + + fn default_field_delimiter() -> u8 { + b',' + } + + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let n_column = columns.len(); + let mut start = 0usize; + let start_row = batch.start_row.expect("must success"); + let mut field_end_idx = 0; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + Self::read_row( + buf, + columns, + &batch.field_ends[field_end_idx..field_end_idx + n_column], + &builder.ctx.format_settings, + &batch.path, + start_row + i, + )?; + start = *end; + field_end_idx += n_column; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf_in: &[u8]) -> Result> { + let num_fields = state.num_fields; + let reader = state.csv_reader.as_mut().expect("must success"); + let field_ends = &mut reader.field_ends[..]; + let start_row = state.rows; + state.offset += buf_in.len(); + + // assume n_out <= n_in for read_record + let mut out_tmp = vec![0u8; buf_in.len()]; + let mut endlen = reader.n_end; + let mut buf = buf_in; + + while state.rows_to_skip > 0 { + let (result, n_in, _, n_end) = + reader + .reader + .read_record(buf, &mut out_tmp, &mut field_ends[endlen..]); + buf = &buf[n_in..]; + endlen += n_end; + + match result { + ReadRecordResult::InputEmpty => { + reader.n_end = endlen; + return Ok(vec![]); + } + ReadRecordResult::OutputFull => { + return Err(csv_error( + "output more than input, in header", + &state.path, + state.rows, + )); + } + ReadRecordResult::OutputEndsFull => { + return Err(csv_error( + &format!( + "too many fields, expect {}, got more than {}", + num_fields, + field_ends.len() + ), + &state.path, + state.rows, + )); + } + ReadRecordResult::Record => { + if endlen < num_fields { + return Err(csv_error( + &format!("expect {} fields, only found {} ", num_fields, n_end), + &state.path, + state.rows, + )); + } else if endlen > num_fields + 1 { + return Err(csv_error( + &format!("too many fields, expect {}, got {}", num_fields, n_end), + &state.path, + state.rows, + )); + } + + state.rows_to_skip -= 1; + state.rows += 1; + endlen = 0; + } + ReadRecordResult::End => { + return Err(csv_error("unexpect eof in header", &state.path, state.rows)); + } + } + } + + let mut out_pos = 0usize; + let mut row_batch_end: usize = 0; + + let last_batch_remain_len = reader.out.len(); + + let mut row_batch = RowBatch { + data: vec![], + row_ends: vec![], + field_ends: vec![], + path: state.path.to_string(), + offset: 0, + start_row: None, + }; + + while !buf.is_empty() { + let (result, n_in, n_out, n_end) = + reader + .reader + .read_record(buf, &mut out_tmp[out_pos..], &mut field_ends[endlen..]); + buf = &buf[n_in..]; + endlen += n_end; + out_pos += n_out; + match result { + ReadRecordResult::InputEmpty => { + break; + } + ReadRecordResult::OutputFull => { + return Err(csv_error( + "output more than input", + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + ReadRecordResult::OutputEndsFull => { + return Err(csv_error( + &format!( + "too many fields, expect {}, got more than {}", + num_fields, + field_ends.len() + ), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + ReadRecordResult::Record => { + if endlen < num_fields { + return Err(csv_error( + &format!("expect {} fields, only found {} ", num_fields, n_end), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } else if endlen > num_fields + 1 { + return Err(csv_error( + &format!("too many fields, expect {}, got {}", num_fields, n_end), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + row_batch + .field_ends + .extend_from_slice(&field_ends[..num_fields]); + row_batch.row_ends.push(last_batch_remain_len + out_pos); + endlen = 0; + row_batch_end = out_pos; + } + ReadRecordResult::End => { + return Err(csv_error( + "unexpect eof", + &state.path, + start_row + row_batch.row_ends.len(), + )); + } + } + } + + if row_batch.row_ends.is_empty() { + reader.out.extend_from_slice(&out_tmp[..out_pos]); + Ok(vec![]) + } else { + state.rows += row_batch.row_ends.len(); + let last_remain = mem::take(&mut reader.out); + reader.out.extend_from_slice(&out_tmp[row_batch_end..]); + out_tmp.truncate(row_batch_end); + row_batch.start_row = Some(state.rows); + row_batch.data = if last_remain.is_empty() { + out_tmp + } else { + vec![last_remain, out_tmp].concat() + }; + Ok(vec![row_batch]) + } + } +} + +pub struct CsvReaderState { + pub reader: csv_core::Reader, + + // remain from last read batch + pub out: Vec, + pub field_ends: Vec, + pub n_end: usize, +} + +impl CsvReaderState { + pub(crate) fn create(ctx: &Arc) -> Self { + let reader = csv_core::ReaderBuilder::new() + .delimiter(ctx.field_delimiter) + .terminator(match ctx.record_delimiter { + RecordDelimiter::Crlf => csv_core::Terminator::CRLF, + RecordDelimiter::Any(v) => csv_core::Terminator::Any(v), + }) + .build(); + Self { + reader, + out: vec![], + field_ends: vec![0; ctx.schema.num_fields() + 6], + n_end: 0, + } + } +} + +fn csv_error(msg: &str, path: &str, row: usize) -> ErrorCode { + let row = row + 1; + let msg = format!("fail to parse CSV {}:{} {} ", path, row, msg); + + ErrorCode::BadBytes(msg) +} From 9a1f1e042183910f1dd84a176a2879f3f7266cfa Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 18 Sep 2022 09:57:33 +0800 Subject: [PATCH 07/22] feat(query): cargo.lock --- Cargo.lock | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index e3b1e4677f95f..d843b05921d13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1683,6 +1683,7 @@ version = "0.1.0" dependencies = [ "async-channel", "async-trait-fn", + "bstr", "common-arrow", "common-base", "common-catalog", @@ -1696,10 +1697,14 @@ dependencies = [ "common-settings", "common-storage", "common-streams", + "crossbeam-channel", + "csv-core", "futures", "futures-util", "opendal", "parking_lot 0.12.1", + "serde_json", + "similar-asserts", "tracing", ] From a50856ae331ebc5167bafac556d422a96fe37e79 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 19:52:55 +0800 Subject: [PATCH 08/22] fix clippy in stage_table.rs. --- src/query/service/src/storages/stage/stage_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/storages/stage/stage_table.rs b/src/query/service/src/storages/stage/stage_table.rs index f0ca2a3d6c28f..7cc77b3f4bfcb 100644 --- a/src/query/service/src/storages/stage/stage_table.rs +++ b/src/query/service/src/storages/stage/stage_table.rs @@ -90,7 +90,7 @@ impl Table for StageTable { self.table_info.schema.clone(), self.table_info.stage_info.clone(), self.table_info.files.clone(), - ctx.get_scan_progress() + ctx.get_scan_progress(), ) .await?, ); From c5764f35a7c1f41043872b00e28553ce3d524a03 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 21:32:52 +0800 Subject: [PATCH 09/22] return error when no file to copy. --- .../src/processors/sources/input_formats/input_context.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index ee1881ece2f4e..812e6cea7be5e 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -94,6 +94,9 @@ impl InputContext { files: Vec, scan_progress: Arc, ) -> Result { + if files.is_empty() { + return Err(ErrorCode::BadArguments("no file to copy")); + } let plan = Box::new(CopyIntoPlan { stage_info, files }); let read_batch_size = 1024 * 1024; let split_size = 128usize * 1024 * 1024; From 121667e5d0721c0dfbf9ef50b0b998e1ad3d470f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 21:35:48 +0800 Subject: [PATCH 10/22] add RowBatch.batch_id to debug aligner. --- .../sources/input_formats/impls/input_format_csv.rs | 2 ++ .../sources/input_formats/impls/input_format_tsv.rs | 12 +++++++++++- .../sources/input_formats/input_format_text.rs | 11 ++++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index 207524de98c62..5060be0d83bf3 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -177,6 +177,7 @@ impl InputFormatTextBase for InputFormatCSV { row_ends: vec![], field_ends: vec![], path: state.path.to_string(), + batch_id: state.batch_id, offset: 0, start_row: None, }; @@ -256,6 +257,7 @@ impl InputFormatTextBase for InputFormatCSV { } else { vec![last_remain, out_tmp].concat() }; + state.batch_id += 1; Ok(vec![row_batch]) } } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index 59a0058f9bf2b..454ed27c41dc1 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -34,6 +34,7 @@ impl InputFormatTSV { deserializers: &mut Vec, format_settings: &FormatSettings, path: &str, + batch_id: usize, offset: usize, row_index: Option, ) -> Result<()> { @@ -86,8 +87,9 @@ impl InputFormatTSV { String::new() }; let mut msg = format!( - "fail to parse tsv {} at offset {}, {} reason={}, row data: ", + "fail to parse tsv {} batch {} at offset {}, {} reason={}, row data: ", path, + batch_id, offset + pos, row_info, m @@ -110,6 +112,13 @@ impl InputFormatTextBase for InputFormatTSV { } fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + tracing::debug!( + "tsv deserializing row batch {}, id={}, start_row={:?}, offset={}", + batch.path, + batch.id, + batch.start_row, + batch.offset + ); let columns = &mut builder.mutable_columns; let mut start = 0usize; let start_row = batch.start_row; @@ -120,6 +129,7 @@ impl InputFormatTextBase for InputFormatTSV { columns, &builder.ctx.format_settings, &batch.path, + batch.batch_id, batch.offset + start, start_row.map(|n| n + i), )?; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index 9376f1ced9882..c9fbcee7ee208 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -143,6 +143,7 @@ pub struct RowBatch { // for error info pub path: String, + pub batch_id: usize, pub offset: usize, pub start_row: Option, } @@ -151,6 +152,7 @@ pub struct AligningState { pub path: String, pub record_delimiter_end: u8, pub field_delimiter: u8, + pub batch_id: usize, pub rows: usize, pub offset: usize, pub rows_to_skip: usize, @@ -203,7 +205,12 @@ impl AligningState { output.data.extend_from_slice(&buf[0..last + 1]); let size = output.data.len(); output.path = self.path.to_string(); + output.start_row = Some(self.rows); + output.offset = self.offset; + output.batch_id = self.batch_id; self.offset += size; + self.rows += rows.len(); + self.batch_id += 1; tracing::debug!( "align {} bytes to {} rows: {} .. {}", size, @@ -227,8 +234,9 @@ impl AligningState { row_ends: vec![end], field_ends: vec![], path: self.path.to_string(), + batch_id: self.batch_id, offset: self.offset, - start_row: None, + start_row: Some(self.rows), }; vec![row_batch] } @@ -260,6 +268,7 @@ impl AligningStateTrait for AligningState { csv_reader, tail_of_last_batch: vec![], rows: 0, + batch_id: 0, num_fields: ctx.schema.num_fields(), offset: split_info.offset, record_delimiter_end: ctx.record_delimiter.end(), From 4e0aafc0eeb8d61879b84f8431c796ae20337caf Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 21:37:18 +0800 Subject: [PATCH 11/22] load hits_100k.tsv with tsv instead of csv. --- tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh index b26b9b8fade48..87f9acbeadd3c 100755 --- a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh +++ b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh @@ -9,7 +9,7 @@ cat $CURDIR/../ddl/hits.sql | $MYSQL_CLIENT_CONNECT hits_statements=( ## load data - "COPY INTO hits FROM 'https://repo.databend.rs/dataset/stateful/hits_100k.tsv' FILE_FORMAT = ( type = 'CSV' field_delimiter = '\t' record_delimiter = '\n' skip_header = 1 );" + "COPY INTO hits FROM 'https://repo.databend.rs/dataset/stateful/hits_100k.tsv' FILE_FORMAT = ( type = 'tsv' record_delimiter = '\n' skip_header = 1 );" ## run test "SELECT '====== SQL1 ======';" "SELECT COUNT(*) FROM hits;" From d8df411a5a254ebd572ffd7b917ebb9175891899 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 22:56:59 +0800 Subject: [PATCH 12/22] fix align_by_record_delimiter --- .../impls/input_format_ndjson.rs | 2 +- .../input_formats/impls/input_format_tsv.rs | 6 ++--- .../input_formats/input_format_text.rs | 23 +++++++++++++------ 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs index 1c11d1ed4bfae..5f04e297ce8ea 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs @@ -105,7 +105,7 @@ impl InputFormatTextBase for InputFormatNDJson { return Err(ErrorCode::BadBytes(msg)); } } - start = *end + 1; + start = *end; } Ok(()) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index 454ed27c41dc1..6a10421ab5048 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -115,7 +115,7 @@ impl InputFormatTextBase for InputFormatTSV { tracing::debug!( "tsv deserializing row batch {}, id={}, start_row={:?}, offset={}", batch.path, - batch.id, + batch.batch_id, batch.start_row, batch.offset ); @@ -123,7 +123,7 @@ impl InputFormatTextBase for InputFormatTSV { let mut start = 0usize; let start_row = batch.start_row; for (i, end) in batch.row_ends.iter().enumerate() { - let buf = &batch.data[start..*end]; + let buf = &batch.data[start..*end]; // include \n Self::read_row( buf, columns, @@ -133,7 +133,7 @@ impl InputFormatTextBase for InputFormatTSV { batch.offset + start, start_row.map(|n| n + i), )?; - start = *end + 1; + start = *end; } Ok(()) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index c9fbcee7ee208..ca9e18535910e 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -166,6 +166,7 @@ pub struct AligningState { impl AligningState { pub fn align_by_record_delimiter(&mut self, buf_in: &[u8]) -> Vec { let record_delimiter_end = self.record_delimiter_end; + let size_last_remain = self.tail_of_last_batch.len(); let mut buf = buf_in; if self.rows_to_skip > 0 { let mut i = 0; @@ -193,16 +194,17 @@ impl AligningState { let rows = &mut output.row_ends; for (i, b) in buf.iter().enumerate() { if *b == b'\n' { - rows.push(i) + rows.push(i + 1 + size_last_remain) } } - let last = rows[rows.len() - 1]; + let batch_end = rows[rows.len() - 1] - size_last_remain; if rows.is_empty() { self.tail_of_last_batch.extend_from_slice(buf); vec![] } else { output.data = mem::take(&mut self.tail_of_last_batch); - output.data.extend_from_slice(&buf[0..last + 1]); + output.data.extend_from_slice(&buf[..batch_end]); + self.tail_of_last_batch.extend_from_slice(&buf[batch_end..]); let size = output.data.len(); output.path = self.path.to_string(); output.start_row = Some(self.rows); @@ -212,11 +214,12 @@ impl AligningState { self.rows += rows.len(); self.batch_id += 1; tracing::debug!( - "align {} bytes to {} rows: {} .. {}", - size, + "align batch {}, {} + {} + {} bytes to {} rows", + output.batch_id, + size_last_remain, + batch_end, + self.tail_of_last_batch.len(), rows.len(), - rows[0], - last ); vec![output] } @@ -238,6 +241,12 @@ impl AligningState { offset: self.offset, start_row: Some(self.rows), }; + tracing::debug!( + "align flush batch {}, bytes = {}, start_row = {}", + row_batch.batch_id, + self.tail_of_last_batch.len(), + self.rows + ); vec![row_batch] } } From 60476bd6c9efce9f25f8e45a72cc4416c064189f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 19 Sep 2022 22:57:57 +0800 Subject: [PATCH 13/22] fix mini_hits.result: \' not escape in origin tsv impl. --- .../1_stateful/04_mini_dataset/04_0001_mini_hits.result | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result index a3fc5d28a909a..e8a8335427185 100644 --- a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result +++ b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.result @@ -170,6 +170,8 @@ my loving than multing ведомосквы вместу ведомосквы вместу ====== SQL26 ====== +'kbnyjuj gjhnf gtgthm vfibys row 3 ставе +'kbnyjuj gjhnf gtgthm vfibys row 3 ставе /topic,6 на карта /topic,6 на карта 1 родильник @@ -178,8 +180,6 @@ my loving than multing 1 родильник 1 родный 1 родный -1 розник -1 розник ====== SQL27 ====== армянск армянск From 3d5edc36ccc46b8e39c2a3c188472366ce6b12a9 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 08:10:06 +0800 Subject: [PATCH 14/22] add setting "input_read_buffer_size". --- .../tests/it/storages/system/settings_table.rs | 1 + src/query/settings/src/lib.rs | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/query/service/tests/it/storages/system/settings_table.rs b/src/query/service/tests/it/storages/system/settings_table.rs index a00d42ff34ec5..f11054aef203e 100644 --- a/src/query/service/tests/it/storages/system/settings_table.rs +++ b/src/query/service/tests/it/storages/system/settings_table.rs @@ -42,6 +42,7 @@ async fn test_settings_table() -> Result<()> { "| enable_planner_v2 | 1 | 1 | SESSION | Enable planner v2 by setting this variable to 1, default value: 1 | UInt64 |", "| field_delimiter | , | , | SESSION | Format field delimiter, default value: , | String |", "| flight_client_timeout | 60 | 60 | SESSION | Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds | UInt64 |", + "| input_read_buffer_size | 1048576 | 1048576 | SESSION | The size of buffer in bytes for input with format. By default, it is 1MB. | UInt64 |", "| group_by_two_level_threshold | 10000 | 10000 | SESSION | The threshold of keys to open two-level aggregation, default value: 10000 | UInt64 |", "| max_block_size | 10000 | 10000 | SESSION | Maximum block size for reading | UInt64 |", "| max_execute_time | 0 | 0 | SESSION | The maximum query execution time. it means no limit if the value is zero. default value: 0 | UInt64 |", diff --git a/src/query/settings/src/lib.rs b/src/query/settings/src/lib.rs index d14ad02bf5644..c1692883a80b9 100644 --- a/src/query/settings/src/lib.rs +++ b/src/query/settings/src/lib.rs @@ -152,6 +152,16 @@ impl Settings { desc: "The size of buffer in bytes for buffered reader of dal. By default, it is 1MB.", possible_values: None, }, + SettingValue { + default_value: UserSettingValue::UInt64(1024 * 1024), + user_setting: UserSetting::create( + "input_read_buffer_size", + UserSettingValue::UInt64(1024 * 1024), + ), + level: ScopeLevel::Session, + desc: "The size of buffer in bytes for input with format. By default, it is 1MB.", + possible_values: None, + }, // enable_new_processor_framework SettingValue { default_value: UserSettingValue::UInt64(1), @@ -370,6 +380,11 @@ impl Settings { self.try_get_u64(key) } + pub fn get_input_read_buffer_size(&self) -> Result { + let key = "input_read_buffer_size"; + self.try_get_u64(key) + } + pub fn get_enable_new_processor_framework(&self) -> Result { let key = "enable_new_processor_framework"; self.try_get_u64(key) From b6a99f19532acb1c75a9f8384f10c6a9edc779b1 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 08:10:28 +0800 Subject: [PATCH 15/22] use setting "input_read_buffer_size". --- .../src/processors/sources/input_formats/input_context.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 812e6cea7be5e..c3606a31e1811 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -98,7 +98,7 @@ impl InputContext { return Err(ErrorCode::BadArguments("no file to copy")); } let plan = Box::new(CopyIntoPlan { stage_info, files }); - let read_batch_size = 1024 * 1024; + let read_batch_size = settings.get_input_read_buffer_size()? as usize; let split_size = 128usize * 1024 * 1024; let file_format_options = &plan.stage_info.file_format_options; let format = Self::get_input_format(&file_format_options.format)?; @@ -150,7 +150,7 @@ impl InputContext { let format = StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?; let format = Self::get_input_format(&format)?; - let read_batch_size = 1024 * 1024; + let read_batch_size = settings.get_input_read_buffer_size()? as usize; let rows_per_block = settings.get_max_block_size()? as usize; let field_delimiter = settings.get_field_delimiter()?; let field_delimiter = { From 7bff6d039c1e3f0f5ffea94f95a98b79db5d4583 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 09:24:01 +0800 Subject: [PATCH 16/22] fix out of range --- .../src/processors/sources/input_formats/input_format_text.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index ca9e18535910e..0363dd6231d8d 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -197,11 +197,11 @@ impl AligningState { rows.push(i + 1 + size_last_remain) } } - let batch_end = rows[rows.len() - 1] - size_last_remain; if rows.is_empty() { self.tail_of_last_batch.extend_from_slice(buf); vec![] } else { + let batch_end = rows[rows.len() - 1] - size_last_remain; output.data = mem::take(&mut self.tail_of_last_batch); output.data.extend_from_slice(&buf[..batch_end]); self.tail_of_last_batch.extend_from_slice(&buf[batch_end..]); From 1f3008727dca24e6646e008324b277f91e7931db Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 09:26:23 +0800 Subject: [PATCH 17/22] remove mistaken Err, change to warning. --- .../processors/sources/input_formats/input_pipeline.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index d878d029296b1..3e691c792ad13 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -21,7 +21,6 @@ use common_base::base::tokio::sync::mpsc::Sender; use common_base::base::GlobalIORuntime; use common_base::base::TrySpawn; use common_datablocks::DataBlock; -use common_exception::ErrorCode; use common_exception::Result; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::Pipeline; @@ -247,10 +246,9 @@ pub trait InputFormatPipe: Sized + Send + 'static { } else { batch.truncate(n); tracing::debug!("read {} bytes", n); - batch_tx - .send(Ok(batch.into())) - .await - .map_err(|_| ErrorCode::UnexpectedError("fail to send ReadBatch"))?; + if let Err(e) = batch_tx.send(Ok(batch.into())).await { + tracing::warn!("fail to send ReadBatch: {}", e); + } } } tracing::debug!("finished"); From da8754dba9c786a0bfc1ccd53a0b98c6773ad143 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 09:26:53 +0800 Subject: [PATCH 18/22] fix CSV aligner. --- .../input_formats/impls/input_format_csv.rs | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index 5060be0d83bf3..ad2f256f5e747 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -158,6 +158,10 @@ impl InputFormatTextBase for InputFormatCSV { } state.rows_to_skip -= 1; + tracing::debug!( + "csv aligner: skip a header row, remain {}", + state.rows_to_skip + ); state.rows += 1; endlen = 0; } @@ -179,7 +183,7 @@ impl InputFormatTextBase for InputFormatCSV { path: state.path.to_string(), batch_id: state.batch_id, offset: 0, - start_row: None, + start_row: Some(state.rows), }; while !buf.is_empty() { @@ -191,9 +195,7 @@ impl InputFormatTextBase for InputFormatCSV { endlen += n_end; out_pos += n_out; match result { - ReadRecordResult::InputEmpty => { - break; - } + ReadRecordResult::InputEmpty => break, ReadRecordResult::OutputFull => { return Err(csv_error( "output more than input", @@ -243,21 +245,37 @@ impl InputFormatTextBase for InputFormatCSV { } } + reader.n_end = endlen; + out_tmp.truncate(out_pos); if row_batch.row_ends.is_empty() { - reader.out.extend_from_slice(&out_tmp[..out_pos]); + tracing::debug!( + "csv aligner: {} + {} bytes => 0 rows", + reader.out.len(), + buf_in.len(), + ); + reader.out.extend_from_slice(&out_tmp); Ok(vec![]) } else { - state.rows += row_batch.row_ends.len(); let last_remain = mem::take(&mut reader.out); + + state.batch_id += 1; + state.rows += row_batch.row_ends.len(); reader.out.extend_from_slice(&out_tmp[row_batch_end..]); + + tracing::debug!( + "csv aligner: {} + {} bytes => {} rows + {} bytes remain", + last_remain.len(), + buf_in.len(), + row_batch.row_ends.len(), + reader.out.len() + ); + out_tmp.truncate(row_batch_end); - row_batch.start_row = Some(state.rows); row_batch.data = if last_remain.is_empty() { out_tmp } else { vec![last_remain, out_tmp].concat() }; - state.batch_id += 1; Ok(vec![row_batch]) } } From b84b9b1a6583dc612108850ad9ce70c3c9457dc8 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 10:04:29 +0800 Subject: [PATCH 19/22] fix logic test for new setting. --- tests/logictest/suites/base/06_show/06_0003_show_settings | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/logictest/suites/base/06_show/06_0003_show_settings b/tests/logictest/suites/base/06_show/06_0003_show_settings index d58223a13c5c8..399187f008dc6 100644 --- a/tests/logictest/suites/base/06_show/06_0003_show_settings +++ b/tests/logictest/suites/base/06_show/06_0003_show_settings @@ -17,6 +17,7 @@ enable_planner_v2 1 1 SESSION Enable planner v2 by setting this variable to field_delimiter , , SESSION Format field delimiter, default value: , String flight_client_timeout 60 60 SESSION Max duration the flight client request is allowed to take in seconds. By default, it is 60 seconds UInt64 group_by_two_level_threshold 10000 10000 SESSION The threshold of keys to open two-level aggregation, default value: 10000 UInt64 +input_read_buffer_size 1048576 1048576 SESSION The size of buffer in bytes for input with format. By default, it is 1MB. UInt64 max_block_size 10000 10000 SESSION Maximum block size for reading UInt64 max_execute_time 0 0 SESSION The maximum query execution time. it means no limit if the value is zero. default value: 0 UInt64 max_threads 11 16 SESSION The maximum number of threads to execute the request. By default, it is determined automatically. UInt64 From 4c4f0564ea12bf7d1262910742581ee3e776180a Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 10:04:44 +0800 Subject: [PATCH 20/22] fix toml fmt. --- src/query/pipeline/sources/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index 5ded4dfe2724d..b2130343ff2f7 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -35,4 +35,3 @@ parking_lot = "0.12.1" serde_json = "1.0.81" similar-asserts = "1.2.0" tracing = "0.1.35" - From 503fa5414291486157bfb8e4829fa9633f8158c9 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 20 Sep 2022 11:37:50 +0800 Subject: [PATCH 21/22] change SQL6 in mini_hits test to tolerate a bug. bug: https://github.com/datafuselabs/databend/issues/7743 --- tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh index 87f9acbeadd3c..a0f9f97c442bd 100755 --- a/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh +++ b/tests/suites/1_stateful/04_mini_dataset/04_0001_mini_hits.sh @@ -22,7 +22,8 @@ hits_statements=( "SELECT '====== SQL5 ======';" "SELECT COUNT(DISTINCT UserID) FROM hits;" "SELECT '====== SQL6 ======';" - "SELECT COUNT(DISTINCT SearchPhrase) FROM hits;" + #"SELECT COUNT(DISTINCT SearchPhrase) FROM hits;" # wait for bugfix https://github.com/datafuselabs/databend/issues/7743 + "SELECT COUNT(DISTINCT SearchPhrase) FROM (select SearchPhrase from hits order by SearchPhrase)" "SELECT '====== SQL7 ======';" "SELECT MIN(EventDate), MAX(EventDate) FROM hits;" "SELECT '====== SQL8 ======';" From 4effb76362a03e44eb4031c4e80b5a98a9cbfdb3 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Wed, 21 Sep 2022 06:27:38 +0800 Subject: [PATCH 22/22] catch up with update of arrow2. --- .../sources/input_formats/impls/input_format_parquet.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 5db6313a59c9c..a3354ba70190e 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -157,6 +157,7 @@ impl RowGroupInMemory { self.fields[f].clone(), self.meta.num_rows() as usize, None, + None, )?; column_chunks.push(array_iters); }