Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Topic could be in fenced state forever if deletion fails #19129

Merged
merged 4 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1268,9 +1268,9 @@ public void deleteLedgerComplete(Object ctx) {
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic",
topic, exception);
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new PersistenceException(exception));
}
Expand All @@ -1289,6 +1289,11 @@ public void deleteLedgerComplete(Object ctx) {
});

return deleteFuture;
}).whenComplete((value, ex) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could use 'exceptionally'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exceptionally swallows the exception so it's not a good fit here

if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to log before calling unfenceTopicToResume

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

unfenceTopicToResume();
}
});
} finally {
lock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
Expand Down Expand Up @@ -131,21 +130,22 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
Expand All @@ -165,7 +165,6 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
private SchemaRegistryService schemaRegistryService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private MetadataStore store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;

Expand Down Expand Up @@ -211,17 +210,15 @@ public void setup() throws Exception {
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());
// Mock metaStore.
ZooKeeper mockZk = createMockZooKeeper();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
doReturn(executor).when(pulsar).getOrderedExecutor();
store = new ZKMetadataStore(mockZk);
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();
doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
// Mock pulsarResources.
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, store);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
doReturn(tsr).when(pulsarResources).getTopicResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
Expand Down Expand Up @@ -268,7 +265,6 @@ public void teardown() throws Exception {
.shutdown(executor)
.handle().get();
EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
store.close();
}

@Test
Expand Down Expand Up @@ -1615,6 +1611,45 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
}


@Test
public void testDeleteTopicDeleteOnMetadataStoreFailed() throws Exception {

doReturn(CompletableFuture.completedFuture(null)).when(ledgerMock).asyncTruncate();

// create topic
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.createPartitionedTopic(TopicName.get(successTopicName), new PartitionedTopicMetadata(2));
PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();

Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

metadataStore.failConditional(new MetadataStoreException("injected error"), (op, path) -> {
if (op == FaultInjectionMetadataStore.OperationType.PUT
&& path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic")) {
return true;
}
return false;
});
try {
topic.delete().get();
fail();
} catch (ExecutionException e) {
final Throwable t = FutureUtil.unwrapCompletionException(e);
assertTrue(t.getMessage().contains("injected error"));
}
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

}


@Test
public void testFailoverSubscription() throws Exception {
PersistentTopic topic1 = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand Down Expand Up @@ -2330,8 +2365,8 @@ public void testGetReplicationClusters() throws Exception {
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), Collections.emptyList());

PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, metadataStore);
NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
doReturn(nsr).when(pulsarResources).getNamespaceResources();
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -39,6 +41,7 @@
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;

/**
* Add possibility to inject failures during tests that interact with MetadataStore.
Expand Down Expand Up @@ -148,17 +151,28 @@ public void registerListener(Consumer<Notification> listener) {

@Override
public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(clazz, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(clazz, cacheConfig));
}

@Override
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(typeRef, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(typeRef, cacheConfig));
}

@Override
public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) {
return store.getMetadataCache(serde, cacheConfig);
return injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
}

@SneakyThrows
private <T> MetadataCache<T> injectMetadataStoreInMetadataCache(MetadataCache<T> metadataCache) {
if (metadataCache instanceof MetadataCacheImpl) {
FieldUtils.writeField(metadataCache, "store", this, true);
} else {
throw new UnsupportedOperationException("Metadata cache implementation "
+ metadataCache.getClass().getName() + " not supported by FaultInjectionMetadataStore");
}
return metadataCache;
}

@Override
Expand Down