Skip to content

Commit

Permalink
Revert "[fix][broker] Topic could be in fenced state forever if delet…
Browse files Browse the repository at this point in the history
…ion fails (#19129)"

This reverts commit 9f0e8e6.
  • Loading branch information
liangyepianzhou committed Feb 8, 2023
1 parent 283f773 commit 660ac36
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
Expand Down Expand Up @@ -56,7 +57,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -127,21 +127,19 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -159,6 +157,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private MetadataStore store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;

Expand Down Expand Up @@ -195,15 +194,17 @@ public void setup() throws Exception {
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
// Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
doReturn(executor).when(pulsar).getOrderedExecutor();
doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
store = new ZKMetadataStore(mockZk);
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, store);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
Expand Down Expand Up @@ -257,11 +258,6 @@ public void teardown() throws Exception {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().get();
}
GracefulExecutorServicesShutdown.initiate()
.timeout(Duration.ZERO)
.shutdown(executor)
.handle().get();
EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
}

@Test
Expand Down Expand Up @@ -1602,45 +1598,6 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
}


@Test
public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {

doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();

// create topic
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.createPartitionedTopic(TopicName.get(successTopicName), new PartitionedTopicMetadata(2));
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

metadataStore.failConditional(new MetadataStoreException("injected error"), (op, path) -> {
if (op == FaultInjectionMetadataStore.OperationType.PUT
&& path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic")) {
return true;
}
return false;
});
try {
topic.delete().get();
fail();
} catch (ExecutionException e) {
final Throwable t = FutureUtil.unwrapCompletionException(e);
assertTrue(t.getMessage().contains("injected error"));
}
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

}


@Test
public void testFailoverSubscription() throws Exception {
PersistentTopic topic1 = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand Down Expand Up @@ -2356,8 +2313,8 @@ public void testGetReplicationClusters() throws Exception {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), Collections.emptyList());

PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -41,7 +39,6 @@
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;

/**
* Add possibility to inject failures during tests that interact with MetadataStore.
Expand Down Expand Up @@ -151,28 +148,17 @@ public void registerListener(Consumer<Notification> listener) {

@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
return injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
return store.getMetadataCache(clazz, cacheConfig);
}

@Override
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
return injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef, cacheConfig));
return store.getMetadataCache(typeRef, cacheConfig);
}

@Override
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
return injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
}

@SneakyThrows
private <T> MetadataCache<T> injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
if (metadataCache instanceof MetadataCacheImpl) {
FieldUtils.writeField(metadataCache, "store", this, true);
} else {
throw new UnsupportedOperationException("Metadata cache implementation "
+ metadataCache.getClass().getName() + " not supported by FaultInjectionMetadataStore");
}
return metadataCache;
return store.getMetadataCache(serde, cacheConfig);
}

@Override
Expand Down

0 comments on commit 660ac36

Please sign in to comment.