Skip to content

Commit

Permalink
NIFI-14283 Switched to non-static workerLock for ConsumeKinesisStream (
Browse files Browse the repository at this point in the history
…#9737)

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
dariuszseweryn authored Feb 24, 2025
1 parent 5b9c0e4 commit 96eea40
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
"leaseManagementConfig.failoverTimeMillis", FAILOVER_TIMEOUT
);

private static final Object WORKER_LOCK = new Object();
private final Object workerLock = new Object();
private static final String SCHEDULER_THREAD_NAME_TEMPLATE = ConsumeKinesisStream.class.getSimpleName() + "-" + Scheduler.class.getSimpleName() + "-";

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down Expand Up @@ -564,7 +564,7 @@ public void onScheduled(ProcessContext context) {
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
if (scheduler == null) {
synchronized (WORKER_LOCK) {
synchronized (workerLock) {
if (scheduler == null) {
final String schedulerId = generateSchedulerId();
getLogger().info("Starting Kinesis Scheduler {}", schedulerId);
Expand Down Expand Up @@ -594,7 +594,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
@OnStopped
public void stopConsuming(final ProcessContext context) {
if (scheduler != null) {
synchronized (WORKER_LOCK) {
synchronized (workerLock) {
if (scheduler != null) {
// indicate whether the processor has been Stopped; the Worker can be marked as SHUT_DOWN but still be waiting
// for ShardConsumers/RecordProcessors to complete, etc.
Expand Down

0 comments on commit 96eea40

Please sign in to comment.