Skip to content

Commit

Permalink
Revert "[fix] Combination of autocreate + forced delete of partitione…
Browse files Browse the repository at this point in the history
…d topic with active consumer leaves topic metadata inconsistent. (apache#17308)"

This reverts commit 9529850.
  • Loading branch information
eolivelli committed Sep 9, 2022
1 parent 200f433 commit 6bcf305
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1138,13 +1138,6 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (TopicName.get(topic).isPartitioned()
&& (getProducers().size() > 0 || getNumberOfConsumers() > 0)
&& getBrokerService().isAllowAutoTopicCreation(topic)) {
// to avoid inconsistent metadata as a result
return FutureUtil.failedFuture(
new TopicBusyException("Partitioned topic has active consumers or producers and "
+ "auto-creation of topic is allowed"));
}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

Expand All @@ -28,25 +26,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -65,7 +54,6 @@ protected int numberOfAdditionalBrokers() {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}

@Override
Expand Down Expand Up @@ -134,90 +122,4 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws
Assert.assertEquals(lookupResultSet.size(), 1);
}

@Test
public void testForceDeletePartitionedTopicWithSub() throws Exception {
final int numPartitions = 10;
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));

admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(5)
.build());

RetentionPolicies retention = new RetentionPolicies(10, 10);
admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
final String topic = "persistent://tenant-xyz/ns-abc/topic-"
+ RandomStringUtils.randomAlphabetic(5)
+ "-testDeletePartitionedTopicWithSub";
final String subscriptionName = "sub";
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();

log.info("Creating producer and consumer");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subscriptionName)
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();

log.info("producing messages");
for (int i = 0; i < numPartitions * 100; ++i) {
producer.newMessage()
.key("" + i)
.value("value-" + i)
.send();
}
producer.flush();
producer.close();

log.info("consuming some messages");
for (int i = 0; i < numPartitions * 5; i++) {
Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
}

log.info("trying to delete the topic");
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected PulsarAdminException.NotFoundException");
} catch (PulsarAdminException e) {
assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers"));
}

// check that metadata is still consistent
assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(numPartitions,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertTrue(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("closing producer and consumer");
producer.close();
consumer.close();

log.info("trying to delete the topic again");
admin.topics().deletePartitionedTopic(topic, true);

assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
.stream().filter(t -> t.contains(topic)).count());
assertEquals(0,
pulsar.getPulsarResources().getTopicResources()
.getExistingPartitions(TopicName.getPartitionedTopicName(topic))
.get()
.stream().filter(t -> t.contains(topic)).count());
assertFalse(admin.topics()
.getPartitionedTopicList("tenant-xyz/ns-abc")
.contains(topic));

log.info("trying to create the topic again");
((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,6 @@ public void testSubscribeRate() throws Exception {
pulsarClient.updateServiceUrl(lookupUrl.toString());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
pulsar.getConfiguration().setAuthorizationEnabled(true);
consumer.close();
admin.topics().deletePartitionedTopic(topicName, true);
admin.namespaces().deleteNamespace(namespace);
admin.tenants().deleteTenant("my-tenants");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -332,12 +331,7 @@ public void topicDeleted(String ignored, boolean partitioned) throws Exception {
p1.send("msg-1");

if (partitioned) {
try {
admin.topics().deletePartitionedTopic(topic, true);
fail("expected error because partitioned topic has active producer");
} catch (PulsarAdminException.ServerSideErrorException e) {
// expected
}
admin.topics().deletePartitionedTopic(topic, true);
} else {
admin.topics().delete(topic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,34 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t
}
}

@Test(timeOut = testTimeout)
public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
final String topicName = "persistent://public/default/issue-9585";
admin.topics().createPartitionedTopic(topicName, 3);
PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(topicName)
.subscriptionName("sub-issue-9585")
.subscribe();

Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
Assert.assertEquals(consumer.getConsumers().size(), 3);

admin.topics().deletePartitionedTopic(topicName, true);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
Assert.assertEquals(consumer.getConsumers().size(), 0);
});

admin.topics().createPartitionedTopic(topicName, 7);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
Assert.assertEquals(consumer.getConsumers().size(), 7);
});
}


@Test(timeOut = testTimeout)
public void testPartitionsUpdatesForMultipleTopics() throws Exception {
final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
Expand Down

This file was deleted.

0 comments on commit 6bcf305

Please sign in to comment.