Skip to content

Commit

Permalink
GH-1436: Async Stop Containers
Browse files Browse the repository at this point in the history
Resolves #1436

Allow shutdown to be started but waiting to be completed asynchronously

Use Task Executor from parent

Update abstract parent to allow running to be set to false
  • Loading branch information
mjaggard authored and garyrussell committed Mar 31, 2022
1 parent 1533645 commit ff67612
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -109,6 +109,7 @@
* @author Arnaud Cogoluègnes
* @author Artem Bilan
* @author Mohammad Hewedy
* @author Mat Jaggard
*/
public abstract class AbstractMessageListenerContainer extends RabbitAccessor
implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean,
Expand Down Expand Up @@ -1331,10 +1332,14 @@ public void shutdown() {
throw convertRabbitAccessException(ex);
}
finally {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.lifecycleMonitor.notifyAll();
}
setNotRunning();
}
}

protected void setNotRunning() {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.lifecycleMonitor.notifyAll();
}
}

Expand Down Expand Up @@ -1420,20 +1425,7 @@ public void stop() {
throw convertRabbitAccessException(ex);
}
finally {
synchronized (this.lifecycleMonitor) {
this.running = false;
this.lifecycleMonitor.notifyAll();
}
}
}

@Override
public void stop(Runnable callback) {
try {
stop();
}
finally {
callback.run();
setNotRunning();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,6 +77,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Alex Panchenko
* @author Mat Jaggard
*
* @since 1.0
*/
Expand Down Expand Up @@ -605,59 +606,83 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process

@Override
protected void doShutdown() {
shutdownAndWaitOrCallback(null);
}

@Override
public void stop(Runnable callback) {
shutdownAndWaitOrCallback(() -> {
setNotRunning();
callback.run();
});
}

private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Thread thread = this.containerStoppingForAbort.get();
if (thread != null && !thread.equals(Thread.currentThread())) {
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
return;
}

try {
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
while (consumerIterator.hasNext()) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
canceledConsumers.add(consumer);
consumerIterator.remove();
if (consumer.declaring) {
consumer.thread.interrupt();
}
List<BlockingQueueConsumer> canceledConsumers = new ArrayList<>();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
while (consumerIterator.hasNext()) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
canceledConsumers.add(consumer);
consumerIterator.remove();
if (consumer.declaring) {
consumer.thread.interrupt();
}
}
}
else {
logger.info("Shutdown ignored - container is already stopped");
return;
}
}

Runnable awaitShutdown = () -> {
logger.info("Waiting for workers to finish.");
try {
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
}
else {
logger.info("Shutdown ignored - container is already stopped");
return;
logger.info("Workers not finished.");
if (isForceCloseChannel()) {
canceledConsumers.forEach(consumer -> {
if (logger.isWarnEnabled()) {
logger.warn("Closing channel for unresponsive consumer: " + consumer);
}
consumer.stop();
});
}
}
}
logger.info("Waiting for workers to finish.");
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
}
else {
logger.info("Workers not finished.");
if (isForceCloseChannel()) {
canceledConsumers.forEach(consumer -> {
if (logger.isWarnEnabled()) {
logger.warn("Closing channel for unresponsive consumer: " + consumer);
}
consumer.stop();
});
}

synchronized (this.consumersMonitor) {
this.consumers = null;
this.cancellationLock.deactivate();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted waiting for workers. Continuing with shutdown.");
}

synchronized (this.consumersMonitor) {
this.consumers = null;
this.cancellationLock.deactivate();
if (callback != null) {
callback.run();
}
};
if (callback == null) {
awaitShutdown.run();
}
else {
getTaskExecutor().execute(awaitShutdown);
}

}

private boolean isActive(BlockingQueueConsumer consumer) {
Expand Down

0 comments on commit ff67612

Please sign in to comment.