Skip to content

Commit

Permalink
Extend the lease by default time
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 9, 2024
1 parent f261e3c commit 5c95c71
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void checkpoint(final String resumeToken, final long recordCount) {
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

public void extendLease() {
LOG.debug("Extending lease of stream partition for collection {}", streamPartition.getCollection());
enhancedSourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

/**
* This method is to reset checkpoint when change stream is invalid. The current thread will give up partition and new thread
* will take ownership of partition. If change stream is valid then new thread proceeds with processing change stream else the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
}
} else {
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("No records processed. Checkpoint to extend the lease of the worker");
partitionCheckpoint.checkpoint(null, 0);
LOG.debug("No records processed. Extend the lease of the partition worker.");
partitionCheckpoint.extendLease();
lastCheckpointTime = System.currentTimeMillis();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ public void resetCheckpoint_success() {
verify(streamProgressState).setLoadedRecords(0);
verify(streamProgressState).setLastUpdateTimestamp(anyLong());
}

@Test
public void extendLease_success() {
dataStreamPartitionCheckpoint.extendLease();
verify(enhancedSourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}
}

0 comments on commit 5c95c71

Please sign in to comment.