Skip to content

Commit

Permalink
[fix][broker] Topic could be in fenced state forever if deletion fails (
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Jan 4, 2023
1 parent 8790ed1 commit a6516a8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,9 +1268,9 @@ public void deleteLedgerComplete(Object ctx) {
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic",
topic, exception);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new PersistenceException(exception));
}
Expand All @@ -1289,6 +1289,11 @@ public void deleteLedgerComplete(Object ctx) {
});

return deleteFuture;
}).whenComplete((value, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
}
});
} finally {
lock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
Expand Down Expand Up @@ -131,21 +130,22 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand All @@ -165,7 +165,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
private SchemaRegistryService schemaRegistryService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private MetadataStore store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;

Expand Down Expand Up @@ -211,17 +210,15 @@ public void setup() throws Exception {
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
// Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
doReturn(executor).when(pulsar).getOrderedExecutor();
store = new ZKMetadataStore(mockZk);
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();
doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, store);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
Expand Down Expand Up @@ -268,7 +265,6 @@ public void teardown() throws Exception {
.shutdown(executor)
.handle().get();
EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
store.close();
}

@Test
Expand Down Expand Up @@ -1615,6 +1611,45 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
}


@Test
public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {

doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();

// create topic
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.createPartitionedTopic(TopicName.get(successTopicName), new PartitionedTopicMetadata(2));
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

metadataStore.failConditional(new MetadataStoreException("injected error"), (op, path) -> {
if (op == FaultInjectionMetadataStore.OperationType.PUT
&& path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic")) {
return true;
}
return false;
});
try {
topic.delete().get();
fail();
} catch (ExecutionException e) {
final Throwable t = FutureUtil.unwrapCompletionException(e);
assertTrue(t.getMessage().contains("injected error"));
}
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

}


@Test
public void testFailoverSubscription() throws Exception {
PersistentTopic topic1 = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand Down Expand Up @@ -2330,8 +2365,8 @@ public void testGetReplicationClusters() throws Exception {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), Collections.emptyList());

PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -39,6 +41,7 @@
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;

/**
* Add possibility to inject failures during tests that interact with MetadataStore.
Expand Down Expand Up @@ -148,17 +151,28 @@ public void registerListener(Consumer<Notification> listener) {

@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(clazz, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
}

@Override
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(typeRef, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef, cacheConfig));
}

@Override
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(serde, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
}

@SneakyThrows
private <T> MetadataCache<T> injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
if (metadataCache instanceof MetadataCacheImpl) {
FieldUtils.writeField(metadataCache, "store", this, true);
} else {
throw new UnsupportedOperationException("Metadata cache implementation "
+ metadataCache.getClass().getName() + " not supported by FaultInjectionMetadataStore");
}
return metadataCache;
}

@Override
Expand Down

0 comments on commit a6516a8

Please sign in to comment.