From 220394e5c9d041fd0a96e9a86d6e979afcbe04db Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Tue, 8 Oct 2019 21:11:34 +0900 Subject: [PATCH] If cursor is not durable, close dispatcher when all consumers are removed from subscription --- .../persistent/PersistentSubscription.java | 17 ++++++++++++- .../broker/service/PersistentTopicTest.java | 24 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 084c2d1c9d210..219ec0dee32aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -244,7 +244,22 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well - close(); + this.close().thenRun(() -> { + synchronized (this) { + if (dispatcher != null) { + dispatcher.close().thenRun(() -> { + log.info("[{}][{}] Successfully closed dispatcher for reader", topicName, subName); + }).exceptionally(ex -> { + log.error("[{}][{}] Failed to close dispatcher for reader", topicName, subName, ex); + return null; + }); + } + } + }).exceptionally(exception -> { + log.error("[{}][{}] Failed to close subscription for reader", topicName, subName, exception); + return null; + }); + // when topic closes: it iterates through concurrent-subscription map to close each subscription. so, // topic.remove again try to access same map which creates deadlock. so, execute it in different thread. topic.getBrokerService().pulsar().getExecutor().submit(() ->{ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 092b570c16cd1..ca06f81eb09af 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -539,6 +539,30 @@ public void testAddRemoveConsumer() throws Exception { } } + @Test + public void testAddRemoveConsumerDurableCursor() throws Exception { + doReturn(false).when(cursorMock).isDurable(); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); + + Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest); + + sub.addConsumer(consumer); + assertFalse(sub.getDispatcher().isClosed()); + sub.removeConsumer(consumer); + + // The dispatcher is closed asynchronously + for (int i = 0; i < 100; i++) { + if (sub.getDispatcher().isClosed()) { + break; + } + Thread.sleep(100); + } + assertTrue(sub.getDispatcher().isClosed()); + } + public void testMaxConsumersShared() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false);