Skip to content

Commit

Permalink
[fix][broker]unify time unit at dropping the backlog on a topic (#17957)
Browse files Browse the repository at this point in the history
(cherry picked from commit add77aa)
  • Loading branch information
HQebupt authored and congbobo184 committed Dec 7, 2022
1 parent e952389 commit 9d2b51b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private void dropBacklogForTimeLimit(PersistentTopic persistentTopic, BacklogQuo
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {
&& currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,60 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception {
client.close();
}

@Test(timeOut = 60000)
public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
new HashMap<>());
admin.namespaces().setBacklogQuota("prop/ns-quota",
BacklogQuota.builder()
.limitTime(5) // set limit time as 5 seconds
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
.build(), BacklogQuota.BacklogQuotaType.message_age);
PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();

final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
final String subName1 = "c1";
final String subName2 = "c2";
int numMsgs = 5;

Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
byte[] content = new byte[1024];
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}

TopicStats stats = getTopicStats(topic1);
assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5);
assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5);

// Sleep 5000 mills for first 5 messages.
Thread.sleep(5000L);
numMsgs = 9;
for (int i = 0; i < numMsgs; i++) {
producer.send(content);
consumer1.receive();
consumer2.receive();
}

// The first 5 messages are expired after sleeping 2000 more mills.
Thread.sleep(2000L);
rolloverStats();

TopicStats stats2 = getTopicStats(topic1);
// The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not.
Awaitility.await().untilAsserted(() -> {
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9);
});
client.close();
}


@Test
public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
Expand Down

0 comments on commit 9d2b51b

Please sign in to comment.