diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java index 59ece03cbdb2..40439d53a3a5 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/ConsumeKinesisStream.java @@ -345,7 +345,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor RELATIONSHIPS = Set.of( @@ -564,7 +564,7 @@ public void onScheduled(ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { if (scheduler == null) { - synchronized (WORKER_LOCK) { + synchronized (workerLock) { if (scheduler == null) { final String schedulerId = generateSchedulerId(); getLogger().info("Starting Kinesis Scheduler {}", schedulerId); @@ -594,7 +594,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session @OnStopped public void stopConsuming(final ProcessContext context) { if (scheduler != null) { - synchronized (WORKER_LOCK) { + synchronized (workerLock) { if (scheduler != null) { // indicate whether the processor has been Stopped; the Worker can be marked as SHUT_DOWN but still be waiting // for ShardConsumers/RecordProcessors to complete, etc.