Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Apache Arrow to 2.0.0 #1231

Merged
merged 10 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ run_test() {
entry=$1
CPYTHON_VERSION=$($entry -c 'import sys; print(str(sys.version_info[0])+str(sys.version_info[1]))')
(cd wheelhouse && $entry -m pip install tensorflow_io-*-cp${CPYTHON_VERSION}-*.whl)
$entry -m pip install -q pytest pytest-benchmark boto3 fastavro avro-python3 scikit-image pandas pyarrow==0.16.0 google-cloud-pubsub==2.1.0 google-cloud-bigquery-storage==1.1.0 google-cloud-bigquery==2.3.1 google-cloud-storage==1.32.0
$entry -m pip install -q pytest pytest-benchmark boto3 fastavro avro-python3 scikit-image pandas pyarrow==2.0.0 google-cloud-pubsub==2.1.0 google-cloud-bigquery-storage==1.1.0 google-cloud-bigquery==2.3.1 google-cloud-storage==1.32.0
(cd tests && $entry -m pytest --benchmark-disable -v --import-mode=append $(find . -type f \( -iname "test_*.py" ! \( -iname "test_*_eager.py" \) \)))
(cd tests && $entry -m pytest --benchmark-disable -v --import-mode=append $(find . -type f \( -iname "test_*_eager.py" ! \( -iname "test_bigquery_eager.py" \) \)))
# GRPC and test_bigquery_eager tests have to be executed separately because of https://github.com/grpc/grpc/issues/20034
Expand Down
26 changes: 13 additions & 13 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,22 @@ http_archive(
http_archive(
name = "thrift",
build_file = "//third_party:thrift.BUILD",
sha256 = "b7452d1873c6c43a580d2b4ae38cfaf8fa098ee6dc2925bae98dce0c010b1366",
strip_prefix = "thrift-0.12.0",
sha256 = "5da60088e60984f4f0801deeea628d193c33cec621e78c8a43a5d8c4055f7ad9",
strip_prefix = "thrift-0.13.0",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/thrift/archive/0.12.0.tar.gz",
"https://github.com/apache/thrift/archive/0.12.0.tar.gz",
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/thrift/archive/v0.13.0.tar.gz",
"https://github.com/apache/thrift/archive/v0.13.0.tar.gz",
],
)

http_archive(
name = "arrow",
build_file = "//third_party:arrow.BUILD",
sha256 = "d7b3838758a365c8c47d55ab0df1006a70db951c6964440ba354f81f518b8d8d",
strip_prefix = "arrow-apache-arrow-0.16.0",
sha256 = "ea299df9cf440cfc43393ce12ee6d9a4c9d0dfa9fde33c3bc9b70ec25520a844",
strip_prefix = "arrow-apache-arrow-2.0.0",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-0.16.0.tar.gz",
"https://github.com/apache/arrow/archive/apache-arrow-0.16.0.tar.gz",
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/arrow/archive/apache-arrow-2.0.0.tar.gz",
"https://github.com/apache/arrow/archive/apache-arrow-2.0.0.tar.gz",
],
)

Expand Down Expand Up @@ -429,11 +429,11 @@ http_archive(

http_archive(
name = "com_github_google_flatbuffers",
sha256 = "12a13686cab7ffaf8ea01711b8f55e1dbd3bf059b7c46a25fefa1250bdd9dd23",
strip_prefix = "flatbuffers-b99332efd732e6faf60bb7ce1ce5902ed65d5ba3",
sha256 = "62f2223fb9181d1d6338451375628975775f7522185266cd5296571ac152bc45",
strip_prefix = "flatbuffers-1.12.0",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/google/flatbuffers/archive/b99332efd732e6faf60bb7ce1ce5902ed65d5ba3.tar.gz",
"https://github.com/google/flatbuffers/archive/b99332efd732e6faf60bb7ce1ce5902ed65d5ba3.tar.gz",
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/google/flatbuffers/archive/v1.12.0.tar.gz",
"https://github.com/google/flatbuffers/archive/v1.12.0.tar.gz",
],
)

Expand Down Expand Up @@ -676,7 +676,7 @@ http_archive(
patches = [
"//third_party:libapr1.patch",
],
sha256 = "1a0909a1146a214a6ab9de28902045461901baab4e0ee43797539ec05b6dbae0",
sha256 = "096968a363b2374f7450a3c65f3cc0b50561204a8da7bc03a2c39e080febd6e1",
strip_prefix = "apr-1.6.5",
urls = [
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/apache/apr/archive/1.6.5.tar.gz",
Expand Down
51 changes: 34 additions & 17 deletions tensorflow_io/arrow/kernels/arrow_dataset_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.

#include "arrow/api.h"
#include "arrow/ipc/api.h"
#include "arrow/result.h"
#include "arrow/util/io_util.h"
#include "tensorflow/core/framework/dataset.h"
#include "tensorflow/core/graph/graph.h"
Expand Down Expand Up @@ -476,12 +477,17 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase {
buffer_ = std::make_shared<arrow::Buffer>(dataset()->buffer_ptr_,
dataset()->buffer_size_);
buffer_reader_ = std::make_shared<arrow::io::BufferReader>(buffer_);
CHECK_ARROW(arrow::ipc::RecordBatchFileReader::Open(
buffer_reader_.get(), buffer_->size(), &reader_));
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>>
result = arrow::ipc::RecordBatchFileReader::Open(
buffer_reader_.get(), buffer_->size());
CHECK_ARROW(result.status());
reader_ = std::move(result).ValueUnsafe();
num_batches_ = reader_->num_record_batches();
if (num_batches_ > 0) {
CHECK_ARROW(
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
arrow::Result<std::shared_ptr<arrow::RecordBatch>> result =
reader_->ReadRecordBatch(current_batch_idx_);
CHECK_ARROW(result.status());
current_batch_ = std::move(result).ValueUnsafe();
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
}
return Status::OK();
Expand All @@ -491,8 +497,10 @@ class ArrowZeroCopyDatasetOp : public ArrowOpKernelBase {
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
if (++current_batch_idx_ < num_batches_) {
CHECK_ARROW(
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
arrow::Result<std::shared_ptr<arrow::RecordBatch>> result =
reader_->ReadRecordBatch(current_batch_idx_);
CHECK_ARROW(result.status());
current_batch_ = std::move(result).ValueUnsafe();
}
return Status::OK();
}
Expand Down Expand Up @@ -604,12 +612,14 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase {
const string& batches = dataset()->batches_.scalar<tstring>()();
auto buffer = std::make_shared<arrow::Buffer>(batches);
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(buffer);
CHECK_ARROW(
arrow::ipc::RecordBatchFileReader::Open(buffer_reader, &reader_));
auto result = arrow::ipc::RecordBatchFileReader::Open(buffer_reader);
CHECK_ARROW(result.status());
reader_ = std::move(result).ValueUnsafe();
num_batches_ = reader_->num_record_batches();
if (num_batches_ > 0) {
CHECK_ARROW(
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
auto result = reader_->ReadRecordBatch(current_batch_idx_);
CHECK_ARROW(result.status());
current_batch_ = std::move(result).ValueUnsafe();
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
}
return Status::OK();
Expand All @@ -619,8 +629,9 @@ class ArrowSerializedDatasetOp : public ArrowOpKernelBase {
TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) override {
ArrowBaseIterator<Dataset>::NextStreamLocked(env);
if (++current_batch_idx_ < num_batches_) {
CHECK_ARROW(
reader_->ReadRecordBatch(current_batch_idx_, &current_batch_));
auto result = reader_->ReadRecordBatch(current_batch_idx_);
CHECK_ARROW(result.status());
current_batch_ = std::move(result).ValueUnsafe();
}
return Status::OK();
}
Expand Down Expand Up @@ -736,14 +747,18 @@ class ArrowFeatherDatasetOp : public ArrowOpKernelBase {
new ArrowRandomAccessFile(tf_file.get(), size));

// Create the Feather reader
std::unique_ptr<arrow::ipc::feather::TableReader> reader;
CHECK_ARROW(arrow::ipc::feather::TableReader::Open(in_file, &reader));
std::shared_ptr<arrow::ipc::feather::Reader> reader;
arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
arrow::ipc::feather::Reader::Open(in_file);
CHECK_ARROW(result.status());
reader = std::move(result).ValueUnsafe();

// Read file columns and build a table
int64_t num_columns = reader->num_columns();
std::shared_ptr<::arrow::Table> table;
CHECK_ARROW(reader->Read(&table));

int64_t num_columns = table->num_columns();

// Convert the table to a sequence of batches
arrow::TableBatchReader tr(*table.get());
std::shared_ptr<arrow::RecordBatch> batch;
Expand Down Expand Up @@ -885,8 +900,10 @@ class ArrowStreamDatasetOp : public ArrowOpKernelBase {
in_stream_ = socket_stream;
}

CHECK_ARROW(arrow::ipc::RecordBatchStreamReader::Open(in_stream_.get(),
&reader_));
auto result =
arrow::ipc::RecordBatchStreamReader::Open(in_stream_.get());
CHECK_ARROW(result.status());
reader_ = std::move(result).ValueUnsafe();
CHECK_ARROW(reader_->ReadNext(&current_batch_));
TF_RETURN_IF_ERROR(CheckBatchColumnTypes(current_batch_));
return Status::OK();
Expand Down
31 changes: 17 additions & 14 deletions tensorflow_io/arrow/kernels/arrow_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ class ArrowReadableFromMemoryInitOp
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(buffer_);

std::shared_ptr<arrow::Schema> schema;
arrow::Status status =
arrow::ipc::ReadSchema(buffer_reader.get(), nullptr, &schema);
OP_REQUIRES(context, status.ok(),
arrow::Result<std::shared_ptr<arrow::Schema>> result =
arrow::ipc::ReadSchema(buffer_reader.get(), nullptr);
OP_REQUIRES(context, result.ok(),
errors::Internal("Error reading Arrow Schema"));
schema = std::move(result).ValueUnsafe();

const Tensor* array_buffer_addrs_tensor;
OP_REQUIRES_OK(context, context->input("array_buffer_addresses",
Expand Down Expand Up @@ -429,10 +430,10 @@ class ListFeatherColumnsOp : public OpKernel {
::arrow::ipc::feather::fbs::GetCTable(buffer.data());

OP_REQUIRES(context,
(table->version() >= ::arrow::ipc::feather::kFeatherVersion),
(table->version() >= ::arrow::ipc::feather::kFeatherV1Version),
errors::InvalidArgument(
"feather file is old: ", table->version(), " vs. ",
::arrow::ipc::feather::kFeatherVersion));
::arrow::ipc::feather::kFeatherV1Version));

std::vector<string> columns;
std::vector<string> dtypes;
Expand Down Expand Up @@ -577,10 +578,10 @@ class FeatherReadable : public IOReadableInterface {
const ::arrow::ipc::feather::fbs::CTable* table =
::arrow::ipc::feather::fbs::GetCTable(buffer.data());

if (table->version() < ::arrow::ipc::feather::kFeatherVersion) {
if (table->version() < ::arrow::ipc::feather::kFeatherV1Version) {
return errors::InvalidArgument("feather file is old: ", table->version(),
" vs. ",
::arrow::ipc::feather::kFeatherVersion);
::arrow::ipc::feather::kFeatherV1Version);
}

for (size_t i = 0; i < table->columns()->size(); i++) {
Expand Down Expand Up @@ -683,18 +684,20 @@ class FeatherReadable : public IOReadableInterface {

if (feather_file_.get() == nullptr) {
feather_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));
arrow::Status s =
arrow::ipc::feather::TableReader::Open(feather_file_, &reader_);
if (!s.ok()) {
return errors::Internal(s.ToString());
arrow::Result<std::shared_ptr<arrow::ipc::feather::Reader>> result =
arrow::ipc::feather::Reader::Open(feather_file_);
if (!result.ok()) {
return errors::Internal(result.status().ToString());
}
reader_ = std::move(result).ValueUnsafe();
}

std::shared_ptr<arrow::ChunkedArray> column;
arrow::Status s = reader_->GetColumn(column_index, &column);
std::shared_ptr<arrow::Table> table;
arrow::Status s = reader_->Read(&table);
if (!s.ok()) {
return errors::Internal(s.ToString());
}
std::shared_ptr<arrow::ChunkedArray> column = table->column(column_index);

std::shared_ptr<::arrow::ChunkedArray> slice =
column->Slice(element_start, element_stop);
Expand Down Expand Up @@ -767,7 +770,7 @@ class FeatherReadable : public IOReadableInterface {
std::unique_ptr<SizedRandomAccessFile> file_ TF_GUARDED_BY(mu_);
uint64 file_size_ TF_GUARDED_BY(mu_);
std::shared_ptr<ArrowRandomAccessFile> feather_file_ TF_GUARDED_BY(mu_);
std::unique_ptr<arrow::ipc::feather::TableReader> reader_ TF_GUARDED_BY(mu_);
std::shared_ptr<arrow::ipc::feather::Reader> reader_ TF_GUARDED_BY(mu_);

std::vector<DataType> dtypes_;
std::vector<TensorShape> shapes_;
Expand Down
8 changes: 6 additions & 2 deletions tensorflow_io/arrow/kernels/arrow_kernels.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ class ArrowRandomAccessFile : public ::arrow::io::RandomAccessFile {
return result.size();
}
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override {
std::shared_ptr<arrow::ResizableBuffer> buffer;
RETURN_NOT_OK(AllocateResizableBuffer(nbytes, &buffer));
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
arrow::AllocateResizableBuffer(nbytes);
ARROW_RETURN_NOT_OK(result);
std::shared_ptr<arrow::ResizableBuffer> buffer =
std::move(result).ValueUnsafe();

ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
Read(nbytes, buffer->mutable_data()));
RETURN_NOT_OK(buffer->Resize(bytes_read));
Expand Down
7 changes: 5 additions & 2 deletions tensorflow_io/arrow/kernels/arrow_stream_client_unix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,11 @@ arrow::Result<int64_t> ArrowStreamClient::Read(int64_t nbytes, void* out) {

arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowStreamClient::Read(
int64_t nbytes) {
std::shared_ptr<arrow::ResizableBuffer> buffer;
ARROW_RETURN_NOT_OK(arrow::AllocateResizableBuffer(nbytes, &buffer));
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
arrow::AllocateResizableBuffer(nbytes);
ARROW_RETURN_NOT_OK(result);
std::shared_ptr<arrow::ResizableBuffer> buffer =
std::move(result).ValueUnsafe();
int64_t bytes_read;
ARROW_ASSIGN_OR_RAISE(bytes_read, Read(nbytes, buffer->mutable_data()));
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));
Expand Down
7 changes: 5 additions & 2 deletions tensorflow_io/arrow/kernels/arrow_stream_client_windows.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ arrow::Result<int64_t> ArrowStreamClient::Read(int64_t nbytes, void* out) {

arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowStreamClient::Read(
int64_t nbytes) {
std::shared_ptr<arrow::ResizableBuffer> buffer;
ARROW_RETURN_NOT_OK(arrow::AllocateResizableBuffer(nbytes, &buffer));
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> result =
arrow::AllocateResizableBuffer(nbytes);
ARROW_RETURN_NOT_OK(result);
std::shared_ptr<arrow::ResizableBuffer> buffer =
std::move(result).ValueUnsafe();
int64_t bytes_read;
ARROW_ASSIGN_OR_RAISE(bytes_read, Read(nbytes, buffer->mutable_data()));
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));
Expand Down
9 changes: 5 additions & 4 deletions tensorflow_io/bigquery/kernels/bigquery_dataset_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ class BigQueryDatasetOp : public DatasetOpKernel {

arrow::ipc::DictionaryMemo dict_memo;
arrow::io::BufferReader input(buffer_);
arrow::Status arrow_status =
arrow::ipc::ReadSchema(&input, &dict_memo, &arrow_schema_);
OP_REQUIRES(ctx, arrow_status.ok(),
arrow::Result<std::shared_ptr<arrow::Schema>> result =
arrow::ipc::ReadSchema(&input, &dict_memo);
OP_REQUIRES(ctx, result.ok(),
errors::Internal("Error reading Arrow Schema",
arrow_status.message()));
result.status().message()));
arrow_schema_ = std::move(result).ValueUnsafe();
} else {
ctx->CtxFailure(errors::InvalidArgument("Invalid data_format"));
}
Expand Down
11 changes: 6 additions & 5 deletions tensorflow_io/bigquery/kernels/bigquery_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,13 @@ class BigQueryReaderArrowDatasetIterator
arrow::io::BufferReader buffer_reader_(buffer_);
arrow::ipc::DictionaryMemo dict_memo;

auto arrow_status =
arrow::ipc::ReadRecordBatch(this->dataset()->arrow_schema(), &dict_memo,
&buffer_reader_, &this->record_batch_);
if (!arrow_status.ok()) {
return errors::Internal(arrow_status.ToString());
auto result = arrow::ipc::ReadRecordBatch(
this->dataset()->arrow_schema(), &dict_memo,
arrow::ipc::IpcReadOptions::Defaults(), &buffer_reader_);
if (!result.ok()) {
return errors::Internal(result.status().ToString());
}
this->record_batch_ = std::move(result).ValueUnsafe();

VLOG(3) << "got record batch, rows:" << record_batch_->num_rows();

Expand Down
25 changes: 14 additions & 11 deletions tensorflow_io/core/kernels/csv_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@ class CSVReadable : public IOReadableInterface {

csv_file_.reset(new ArrowRandomAccessFile(file_.get(), file_size_));

::arrow::Status status;

status = ::arrow::csv::TableReader::Make(
auto result = ::arrow::csv::TableReader::Make(
::arrow::default_memory_pool(), csv_file_,
::arrow::csv::ReadOptions::Defaults(),
::arrow::csv::ParseOptions::Defaults(),
::arrow::csv::ConvertOptions::Defaults(), &reader_);
if (!status.ok()) {
return errors::InvalidArgument("unable to make a TableReader: ", status);
::arrow::csv::ConvertOptions::Defaults());
if (!result.status().ok()) {
return errors::InvalidArgument("unable to make a TableReader: ",
result.status());
}
status = reader_->Read(&table_);
if (!status.ok()) {
return errors::InvalidArgument("unable to read table: ", status);
reader_ = std::move(result).ValueUnsafe();

{
auto result = reader_->Read();
if (!result.status().ok()) {
return errors::InvalidArgument("unable to read table: ",
result.status());
}
table_ = std::move(result).ValueUnsafe();
}

for (int i = 0; i < table_->num_columns(); i++) {
Expand Down Expand Up @@ -108,11 +113,9 @@ class CSVReadable : public IOReadableInterface {
case ::arrow::Type::TIMESTAMP:
case ::arrow::Type::TIME32:
case ::arrow::Type::TIME64:
case ::arrow::Type::INTERVAL:
case ::arrow::Type::DECIMAL:
case ::arrow::Type::LIST:
case ::arrow::Type::STRUCT:
case ::arrow::Type::UNION:
case ::arrow::Type::DICTIONARY:
case ::arrow::Type::MAP:
default:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_arrow_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def test_arrow_feather_dataset(self):

# Create a tempfile that is deleted after tests run
with tempfile.NamedTemporaryFile(delete=False) as f:
write_feather(df, f)
write_feather(df, f, version=1)

# test single file
dataset = arrow_io.ArrowFeatherDataset(
Expand Down Expand Up @@ -1143,7 +1143,7 @@ def test_arrow_list_feather_columns(self):

# Create a tempfile that is deleted after tests run
with tempfile.NamedTemporaryFile(delete=False) as f:
write_feather(df, f)
write_feather(df, f, version=1)

# test single file
# prefix "file://" to test scheme file system (e.g., s3, gcs, azfs, ignite)
Expand Down
Loading