From 9cdc29ad491ffffa1d2d13f2a8c33a173c1e3b1a Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Fri, 7 Oct 2022 11:33:23 +0800 Subject: [PATCH 1/2] [fix][broker]unify time unit at dropping the backlog on a topic --- .../broker/service/BacklogQuotaManager.java | 2 +- .../service/BacklogQuotaManagerTest.java | 56 ++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index b11538a9217e2..93ae777a89e2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -220,7 +220,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = mLedger.getNextValidPosition( PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index fccc7abc66854..9b45c0082e944 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -111,7 +111,7 @@ void setup() throws Exception { config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); - config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2); config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); @@ -527,6 +527,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { client.close(); } + @Test(timeOut = 60000) + public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception { + assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), + new HashMap<>()); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(5) // set limit time as 5 seconds + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), BacklogQuota.BacklogQuotaType.message_age); + PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID(); + final String subName1 = "c1"; + final String subName2 = "c2"; + int numMsgs = 5; + + Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); + Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + TopicStats stats = getTopicStats(topic1); + assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5); + assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); + + // Sleep 5000 mills for first 5 messages. + Thread.sleep(5000l); + numMsgs = 9; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + // The first 5 messages are expired after sleeping 2000 more mills. + Thread.sleep(2000l); + rolloverStats(); + + TopicStats stats2 = getTopicStats(topic1); + // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not. + Awaitility.await().untilAsserted(() -> { + assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); + assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9); + }); + client.close(); + } + + @Test public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), From 0c9bb9058d340ea6fd857682a02aea590e917277 Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Fri, 7 Oct 2022 17:23:06 +0800 Subject: [PATCH 2/2] [fix][broker]unify time unit at dropping the backlog on a topic --- .../pulsar/broker/service/BacklogQuotaManagerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 9b45c0082e944..e60486b344182 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -111,7 +111,7 @@ void setup() throws Exception { config.setBrokerServicePort(Optional.of(0)); config.setAuthorizationEnabled(false); config.setAuthenticationEnabled(false); - config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA / 2); + config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); @@ -528,7 +528,7 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { } @Test(timeOut = 60000) - public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Exception { + public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), new HashMap<>()); admin.namespaces().setBacklogQuota("prop/ns-quota", @@ -559,7 +559,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Excepti assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); // Sleep 5000 mills for first 5 messages. - Thread.sleep(5000l); + Thread.sleep(5000L); numMsgs = 9; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -568,7 +568,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithoutEviction() throws Excepti } // The first 5 messages are expired after sleeping 2000 more mills. - Thread.sleep(2000l); + Thread.sleep(2000L); rolloverStats(); TopicStats stats2 = getTopicStats(topic1);