diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java index 5cfa2adc78..2ea39028c9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.springframework.amqp.AmqpException; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -38,7 +39,7 @@ * @since 1.3 */ public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, - InitializingBean { + InitializingBean, DisposableBean { private final Map targetConnectionFactories = new ConcurrentHashMap(); @@ -260,4 +261,15 @@ protected ConnectionFactory removeTargetConnectionFactory(Object key) { @Nullable protected abstract Object determineCurrentLookupKey(); + @Override + public void destroy() { + resetConnection(); + } + + @Override + public void resetConnection() { + this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection()); + this.defaultTargetConnectionFactory.resetConnection(); + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index 0fa960684b..15504f2202 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -882,6 +882,7 @@ public final void destroy() { * used to force a reconnect to the primary broker after failing over to a secondary * broker. */ + @Override public void resetConnection() { synchronized (this.connectionMonitor) { if (this.connection.target != null) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java index 4b1e049be6..2f4323b540 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -83,4 +83,12 @@ default boolean isPublisherReturns() { return false; } + /** + * Close any connection(s) that might be cached by this factory. This does not prevent + * new connections from being opened. + * @since 2.4.4 + */ + default void resetConnection() { + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java index fc04a7319d..e209734ec6 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -350,7 +350,7 @@ protected ConnectionFactory createConnectionFactory(String address, String node) } @Override - public void destroy() { + public void resetConnection() { Exception lastException = null; for (ConnectionFactory connectionFactory : this.nodeFactories.values()) { if (connectionFactory instanceof DisposableBean) { @@ -367,4 +367,9 @@ public void destroy() { } } + @Override + public void destroy() { + resetConnection(); + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java index 391874f301..15add4091f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java @@ -149,6 +149,7 @@ public synchronized Connection createConnection() throws AmqpException { * used to force a reconnect to the primary broker after failing over to a secondary * broker. */ + @Override public void resetConnection() { destroy(); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java index 35a28687d8..0da501fe84 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -140,6 +140,7 @@ public void closeThreadChannel() { * used to force a reconnect to the primary broker after failing over to a secondary * broker. */ + @Override public void resetConnection() { destroy(); }