From 98e6e793264601aad9a199b4446ebe0c0dc65424 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 23 Aug 2022 15:49:16 -0700 Subject: [PATCH] fix: Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. --- .../service/persistent/PersistentTopic.java | 7 + .../admin/AdminApiMultiBrokersTest.java | 98 ++++++++++ .../integration/topics/TestTopicDeletion.java | 183 ++++++++++++++++++ 3 files changed, 288 insertions(+) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4777546bc84a64..4bd54a429dfdd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1139,6 +1139,13 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, .map(PersistentSubscription::getName).toList(); return FutureUtil.failedFuture( new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); + } else if (TopicName.get(topic).isPartitioned() + && (getProducers().size() > 0 || getNumberOfConsumers() > 0) + && getBrokerService().isAllowAutoTopicCreation(topic)) { + // to avoid inconsistent metadata as a result + return FutureUtil.failedFuture( + new TopicBusyException("Partitioned topic has active consumers or producers and " + + "auto-creation of topic is allowed")); } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 2cbff955ecff10..5a0bde6f913866 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.admin; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -26,6 +28,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -33,9 +37,16 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -54,6 +65,7 @@ protected int numberOfAdditionalBrokers() { @Override protected void doInitConf() throws Exception { super.doInitConf(); + this.conf.setManagedLedgerMaxEntriesPerLedger(10); } @Override @@ -122,4 +134,90 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws Assert.assertEquals(lookupResultSet.size(), 1); } + @Test + public void testForceDeletePartitionedTopicWithSub() throws Exception { + final int numPartitions = 10; + TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test")); + + admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc", + AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType("partitioned") + .defaultNumPartitions(5) + .build()); + + RetentionPolicies retention = new RetentionPolicies(10, 10); + admin.namespaces().setRetention("tenant-xyz/ns-abc", retention); + final String topic = "persistent://tenant-xyz/ns-abc/topic-" + + RandomStringUtils.randomAlphabetic(5) + + "-testDeletePartitionedTopicWithSub"; + final String subscriptionName = "sub"; + ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); + + log.info("Creating producer and consumer"); + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create(); + + log.info("producing messages"); + for (int i = 0; i < numPartitions * 100; ++i) { + producer.newMessage() + .key("" + i) + .value("value-" + i) + .send(); + } + producer.flush(); + producer.close(); + + log.info("consuming some messages"); + for (int i = 0; i < numPartitions * 5; i++) { + Message m = consumer.receive(1, TimeUnit.MINUTES); + } + + log.info("trying to delete the topic"); + try { + admin.topics().deletePartitionedTopic(topic, true); + fail("expected PulsarAdminException.NotFoundException"); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers")); + } + + // check that metadata is still consistent + assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc") + .stream().filter(t -> t.contains(topic)).count()); + assertEquals(numPartitions, + pulsar.getPulsarResources().getTopicResources() + .getExistingPartitions(TopicName.getPartitionedTopicName(topic)) + .get() + .stream().filter(t -> t.contains(topic)).count()); + assertTrue(admin.topics() + .getPartitionedTopicList("tenant-xyz/ns-abc") + .contains(topic)); + + log.info("closing producer and consumer"); + producer.close(); + consumer.close(); + + log.info("trying to delete the topic again"); + admin.topics().deletePartitionedTopic(topic, true); + + assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc") + .stream().filter(t -> t.contains(topic)).count()); + assertEquals(0, + pulsar.getPulsarResources().getTopicResources() + .getExistingPartitions(TopicName.getPartitionedTopicName(topic)) + .get() + .stream().filter(t -> t.contains(topic)).count()); + assertFalse(admin.topics() + .getPartitionedTopicList("tenant-xyz/ns-abc") + .contains(topic)); + + log.info("trying to create the topic again"); + ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get(); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java new file mode 100644 index 00000000000000..0adb414e8f4f6a --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.topics; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +/** + * Test cases for compaction. + */ +@Slf4j +public class TestTopicDeletion extends PulsarTestSuite { + + final private boolean unload = false; + final private int numBrokers = 2; + + public void setupCluster() throws Exception { + brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10"); + brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false"); + brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false"); + this.setupCluster(""); + } + + protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster( + String clusterName, + PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { + specBuilder.numBrokers(numBrokers); + specBuilder.enableContainerLog(true); + return specBuilder; + } + + @Test(dataProvider = "ServiceUrls", timeOut=300_000) + public void testPartitionedTopicForceDeletion(Supplier serviceUrl) throws Exception { + + log.info("Creating tenant and namespace"); + + final String tenant = "test-partitioned-topic-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/partitioned-topic"; + final int numPartitions = numBrokers * 3; + final int numKeys = numPartitions * 50; + final String subscriptionName = "sub1"; + + this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin"); + + this.createNamespace(namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-retention", "--size", "100M", "--time", "100m", namespace); + + this.createPartitionedTopic(topic, numPartitions); + + try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) { + + log.info("Creating consumer"); + Consumer consumer = client.newConsumer() + .topic(topic) + .subscriptionName(subscriptionName) + .subscribe(); + + log.info("Producing messages"); + try(Producer producer = client.newProducer() + .topic(topic) + .create() + ) { + for (int i = 0; i < numKeys; i++) { + producer.newMessage() + .key("" + i) + .value(("value-" + i).getBytes(UTF_8)) + .sendAsync(); + } + producer.flush(); + log.info("Successfully wrote {} values", numKeys); + } + + log.info("Consuming half of the messages"); + for (int i = 0; i < numKeys / 2; i++) { + Message m = consumer.receive(1, TimeUnit.MINUTES); + log.info("Read value {}", m.getKey()); + } + + if (unload) { + log.info("Unloading topic"); + pulsarCluster.runAdminCommandOnAnyBroker("topics", + "unload", topic); + } + + ContainerExecResult res; + log.info("Deleting the topic"); + try { + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + assertNotEquals(0, res.getExitCode()); + } catch (ContainerExecException e) { + log.info("Second delete failed with ContainerExecException, could be ok", e); + if (!e.getMessage().contains("with error code 1")) { + fail("Expected different error code"); + } + } + + log.info("Close the consumer and delete the topic again"); + consumer.close(); + + res = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "delete-partitioned-topic", "--force", topic); + assertNotEquals(0, res.getExitCode()); + + Thread.sleep(5000); + // should succeed + log.info("Creating the topic again"); + this.createPartitionedTopic(topic, numBrokers * 2); + } + } + + + private ContainerExecResult createTenantName(final String tenantName, + final String allowedClusterName, + final String adminRoleName) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "tenants", "create", "--allowed-clusters", allowedClusterName, + "--admin-roles", adminRoleName, tenantName); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createNamespace(final String Ns) throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", + "create", + "--clusters", + pulsarCluster.getClusterName(), Ns); + assertEquals(0, result.getExitCode()); + return result; + } + + private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions) + throws Exception { + ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker( + "topics", + "create-partitioned-topic", + "--partitions", "" + numPartitions, + partitionedTopicName); + assertEquals(0, result.getExitCode()); + return result; + } + + +}