Skip to content

Commit

Permalink
[CherryPick #30229] [Dataflow Streaming] Invalidate caches and remove…
Browse files Browse the repository at this point in the history
… work on failure before commit (#30234)

* Invalidate caches and remove work on failure before commit

* Prevent completeWorkAndScheduleNextWorkForKey from throwing

---------

Co-authored-by: Arun Pandian <[email protected]>
  • Loading branch information
lostluck and arunpandianp authored Feb 6, 2024
1 parent c726525 commit 782a78b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1385,12 +1385,21 @@ private void commitLoop() {
// Adds the commit to the commitStream if it fits, returning true iff it is consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) {
Preconditions.checkNotNull(commit);
final ComputationState state = commit.computationState();
final Windmill.WorkItemCommitRequest request = commit.request();
// Drop commits for failed work. Such commits will be dropped by Windmill anyway.
if (commit.work().isFailed()) {
readerCache.invalidateReader(
WindmillComputationKey.create(
state.getComputationId(), request.getKey(), request.getShardingKey()));
stateCache
.forComputation(state.getComputationId())
.invalidate(request.getKey(), request.getShardingKey());
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
return true;
}
final ComputationState state = commit.computationState();
final Windmill.WorkItemCommitRequest request = commit.request();

final int size = commit.getSize();
commit.work().setState(Work.State.COMMITTING);
activeCommitBytes.addAndGet(size);
Expand All @@ -1407,8 +1416,6 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
.invalidate(request.getKey(), request.getShardingKey());
}
activeCommitBytes.addAndGet(-size);
// This may throw an exception if the commit was not active, which is possible if it
// was deemed stuck.
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,13 @@ synchronized Optional<Work> completeWorkAndGetNextWorkForKey(

private synchronized void removeCompletedWorkFromQueue(
Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
Work completedWork =
Optional.ofNullable(workQueue.peek())
.orElseThrow(
() ->
new IllegalStateException(
String.format(
"Active key %s without work, expected token %d",
shardedKey, workToken)));
Work completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn(
String.format("Active key %s without work, expected token %d", shardedKey, workToken));
return;
}

if (completedWork.getWorkItem().getWorkToken() != workToken) {
// Work may have been completed due to clearing of stuck commits.
Expand Down

0 comments on commit 782a78b

Please sign in to comment.