From 49f0ba32112c27c7d3d2c57030e20d66c07c77ba Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 10 Jul 2024 19:57:13 -0400 Subject: [PATCH 1/6] Increase retry backoff for Storage API batch --- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index ce5e7b4854e9..8ace778d1608 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -967,8 +967,8 @@ void flushAll( RetryManager retryManager = new RetryManager<>( - Duration.standardSeconds(1), - Duration.standardSeconds(20), + Duration.standardSeconds(5), + Duration.standardSeconds(60), maxRetries, BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); From cbbe3f004a4d22a36745cedae83a4bd3cb1b28fb Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 10 Jul 2024 21:37:48 -0400 Subject: [PATCH 2/6] longer waits for quota error only --- .../sdk/io/gcp/bigquery/RetryManager.java | 39 +++++++++++++++---- .../StorageApiWriteUnshardedRecords.java | 7 +++- sdks/python/calc.py | 14 +++++++ 3 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 sdks/python/calc.py diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index 1a7202de0a56..f5d1e56ad83e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -17,11 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import java.io.IOException; import java.time.Instant; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -52,6 +51,9 @@ class RetryManager> { private Queue> operations; private final BackOff backoff; + + // Longer backoff for quota errors because AppendRows throughput takes a long time to cool off + private final BackOff quotaBackoff; private static final ExecutorService executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("BeamBQRetryManager-%d").build()); @@ -61,7 +63,9 @@ enum RetryType { // The in-flight operations will not be retried. DONT_RETRY, // All operations will be retried. - RETRY_ALL_OPERATIONS + RETRY_ALL_OPERATIONS, + // Retry operations due to a quota error. Tells RetryManager to wait longer between retries + RETRY_QUOTA }; static class WrappedFailure extends Throwable { @@ -85,6 +89,13 @@ Object getResult() { .withMaxBackoff(maxBackoff) .withMaxRetries(maxRetries) .backoff(); + + quotaBackoff = + FluentBackoff.DEFAULT + .withInitialBackoff(initialBackoff.multipliedBy(5)) + .withMaxBackoff(maxBackoff.multipliedBy(3)) + .withMaxRetries(maxRetries) + .backoff(); } RetryManager( @@ -97,6 +108,14 @@ Object getResult() { .withMaxRetries(maxRetries) .withThrottledTimeCounter(throttledTimeCounter) .backoff(); + + quotaBackoff = + FluentBackoff.DEFAULT + .withInitialBackoff(initialBackoff.multipliedBy(5)) + .withMaxBackoff(maxBackoff.multipliedBy(3)) + .withMaxRetries(maxRetries) + .withThrottledTimeCounter(throttledTimeCounter) + .backoff(); } static class Operation> { @@ -313,10 +332,9 @@ void await() throws Exception { if (retryType == RetryType.DONT_RETRY) { operations.clear(); } else { - checkState(RetryType.RETRY_ALL_OPERATIONS == retryType); - if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - throw new RuntimeException(failure); - } + sleepOrFail( + retryType == RetryType.RETRY_ALL_OPERATIONS ? backoff : quotaBackoff, failure); + for (Operation awaitOperation : operations) { awaitOperation.await(); } @@ -330,4 +348,11 @@ void await() throws Exception { } } } + + private void sleepOrFail(BackOff backoff, @Nullable Throwable failure) + throws IOException, InterruptedException { + if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + throw new RuntimeException(failure); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8ace778d1608..11e531e5c9c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -811,6 +811,9 @@ long flush( .inc(numRowsRetried); appendFailures.inc(); + if (quotaError) { + return RetryType.RETRY_QUOTA; + } return RetryType.RETRY_ALL_OPERATIONS; }, c -> { @@ -967,8 +970,8 @@ void flushAll( RetryManager retryManager = new RetryManager<>( - Duration.standardSeconds(5), - Duration.standardSeconds(60), + Duration.standardSeconds(1), + Duration.standardSeconds(20), maxRetries, BigQuerySinkMetrics.throttledTimeCounter( BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); diff --git a/sdks/python/calc.py b/sdks/python/calc.py new file mode 100644 index 000000000000..afd1fd6ec526 --- /dev/null +++ b/sdks/python/calc.py @@ -0,0 +1,14 @@ + + +initial = 1 +backoff_multiplier = 1.5 + +total = 0 + +for i in range(5): + total += initial + initial *= backoff_multiplier + if initial > 20: + initial = 20 + +print(total) \ No newline at end of file From 8019b8dd1fd9a3256c5dbe6baae9198a7cc62be6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 10 Jul 2024 21:39:13 -0400 Subject: [PATCH 3/6] cleanup --- sdks/python/calc.py | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 sdks/python/calc.py diff --git a/sdks/python/calc.py b/sdks/python/calc.py deleted file mode 100644 index afd1fd6ec526..000000000000 --- a/sdks/python/calc.py +++ /dev/null @@ -1,14 +0,0 @@ - - -initial = 1 -backoff_multiplier = 1.5 - -total = 0 - -for i in range(5): - total += initial - initial *= backoff_multiplier - if initial > 20: - initial = 20 - -print(total) \ No newline at end of file From 347ade2fd6e2d254172d1fb096576823ca6dbd13 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 12 Jul 2024 16:41:42 -0400 Subject: [PATCH 4/6] add to CHANGES.md --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index fc94877a2bb3..243596e6f2eb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). * Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721)) +* [BigQueryIO] Better handling for batch Storage Write API when it hits AppendRows throughput quota ([#31837](https://github.com/apache/beam/pull/31837)) * [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) @@ -83,7 +84,7 @@ ## Bugfixes -* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) +* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Security Fixes From 72c896817c1f46f98d2db581514fda7ea1b899d8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 13:29:16 -0400 Subject: [PATCH 5/6] no need for quota backoff. just increase allowed retries --- .../sdk/io/gcp/bigquery/RetryManager.java | 37 +++---------------- .../StorageApiWriteUnshardedRecords.java | 5 +-- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index f5d1e56ad83e..01ae7d5ae2d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import java.io.IOException; import java.time.Instant; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -51,9 +52,6 @@ class RetryManager> { private Queue> operations; private final BackOff backoff; - - // Longer backoff for quota errors because AppendRows throughput takes a long time to cool off - private final BackOff quotaBackoff; private static final ExecutorService executor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("BeamBQRetryManager-%d").build()); @@ -64,8 +62,6 @@ enum RetryType { DONT_RETRY, // All operations will be retried. RETRY_ALL_OPERATIONS, - // Retry operations due to a quota error. Tells RetryManager to wait longer between retries - RETRY_QUOTA }; static class WrappedFailure extends Throwable { @@ -89,13 +85,6 @@ Object getResult() { .withMaxBackoff(maxBackoff) .withMaxRetries(maxRetries) .backoff(); - - quotaBackoff = - FluentBackoff.DEFAULT - .withInitialBackoff(initialBackoff.multipliedBy(5)) - .withMaxBackoff(maxBackoff.multipliedBy(3)) - .withMaxRetries(maxRetries) - .backoff(); } RetryManager( @@ -108,14 +97,6 @@ Object getResult() { .withMaxRetries(maxRetries) .withThrottledTimeCounter(throttledTimeCounter) .backoff(); - - quotaBackoff = - FluentBackoff.DEFAULT - .withInitialBackoff(initialBackoff.multipliedBy(5)) - .withMaxBackoff(maxBackoff.multipliedBy(3)) - .withMaxRetries(maxRetries) - .withThrottledTimeCounter(throttledTimeCounter) - .backoff(); } static class Operation> { @@ -332,9 +313,10 @@ void await() throws Exception { if (retryType == RetryType.DONT_RETRY) { operations.clear(); } else { - sleepOrFail( - retryType == RetryType.RETRY_ALL_OPERATIONS ? backoff : quotaBackoff, failure); - + checkState(RetryType.RETRY_ALL_OPERATIONS == retryType); + if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + throw new RuntimeException(failure); + } for (Operation awaitOperation : operations) { awaitOperation.await(); } @@ -348,11 +330,4 @@ void await() throws Exception { } } } - - private void sleepOrFail(BackOff backoff, @Nullable Throwable failure) - throws IOException, InterruptedException { - if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - throw new RuntimeException(failure); - } - } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 7e3093cf81d7..f0c4a56ed3d4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -771,7 +771,7 @@ long flush( invalidateWriteStream(); allowedRetry = 5; } else { - allowedRetry = 10; + allowedRetry = 35; } // Maximum number of times we retry before we fail the work item. @@ -819,9 +819,6 @@ long flush( .inc(numRowsRetried); appendFailures.inc(); - if (quotaError) { - return RetryType.RETRY_QUOTA; - } return RetryType.RETRY_ALL_OPERATIONS; }, c -> { From a99b776fe282dd30333bc21214fdff0f3c5602e8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 15 Jul 2024 13:29:50 -0400 Subject: [PATCH 6/6] cleanup --- .../java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java index 01ae7d5ae2d8..1a7202de0a56 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java @@ -61,7 +61,7 @@ enum RetryType { // The in-flight operations will not be retried. DONT_RETRY, // All operations will be retried. - RETRY_ALL_OPERATIONS, + RETRY_ALL_OPERATIONS }; static class WrappedFailure extends Throwable {