From 66af1ae1bcdb858a68a74c1351736fe4fcc983b8 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 6 Feb 2024 02:34:13 -0800 Subject: [PATCH 1/2] [Dataflow Streaming] Invalidate caches and remove work on failure before commit --- .../worker/StreamingDataflowWorker.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 3ba27bd852fc..7da9d8890dc2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1397,12 +1397,25 @@ private void commitLoop() { // Adds the commit to the commitStream if it fits, returning true iff it is consumed. private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) { Preconditions.checkNotNull(commit); + final ComputationState state = commit.computationState(); + final Windmill.WorkItemCommitRequest request = commit.request(); // Drop commits for failed work. Such commits will be dropped by Windmill anyway. if (commit.work().isFailed()) { + readerCache.invalidateReader( + WindmillComputationKey.create( + state.getComputationId(), request.getKey(), request.getShardingKey())); + stateCache + .forComputation(state.getComputationId()) + .invalidate(request.getKey(), request.getShardingKey()); + try { + state.completeWorkAndScheduleNextWorkForKey( + ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); + } catch (RuntimeException e) { + LOG.warn("completeWorkAndScheduleNextWorkForKey on failed work threw", e); + } return true; } - final ComputationState state = commit.computationState(); - final Windmill.WorkItemCommitRequest request = commit.request(); + final int size = commit.getSize(); commit.work().setState(Work.State.COMMITTING); activeCommitBytes.addAndGet(size); From b29a3973238f4117b6fabd1386ba0b922faf0c88 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 6 Feb 2024 02:52:13 -0800 Subject: [PATCH 2/2] Prevent completeWorkAndScheduleNextWorkForKey from throwing --- .../worker/StreamingDataflowWorker.java | 10 ++-------- .../worker/streaming/ActiveWorkState.java | 17 +++++++---------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 7da9d8890dc2..14efdcc5eb02 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1407,12 +1407,8 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) stateCache .forComputation(state.getComputationId()) .invalidate(request.getKey(), request.getShardingKey()); - try { - state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); - } catch (RuntimeException e) { - LOG.warn("completeWorkAndScheduleNextWorkForKey on failed work threw", e); - } + state.completeWorkAndScheduleNextWorkForKey( + ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); return true; } @@ -1432,8 +1428,6 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) .invalidate(request.getKey(), request.getShardingKey()); } activeCommitBytes.addAndGet(-size); - // This may throw an exception if the commit was not active, which is possible if it - // was deemed stuck. state.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 54942dfeee1f..ff46356d9569 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -188,16 +188,13 @@ synchronized Optional completeWorkAndGetNextWorkForKey( private synchronized void removeCompletedWorkFromQueue( Queue workQueue, ShardedKey shardedKey, long workToken) { - // avoid Preconditions.checkState here to prevent eagerly evaluating the - // format string parameters for the error message. - Work completedWork = - Optional.ofNullable(workQueue.peek()) - .orElseThrow( - () -> - new IllegalStateException( - String.format( - "Active key %s without work, expected token %d", - shardedKey, workToken))); + Work completedWork = workQueue.peek(); + if (completedWork == null) { + // Work may have been completed due to clearing of stuck commits. + LOG.warn( + String.format("Active key %s without work, expected token %d", shardedKey, workToken)); + return; + } if (completedWork.getWorkItem().getWorkToken() != workToken) { // Work may have been completed due to clearing of stuck commits.