From 93fc19a94f0a3693a9c24080c05a43ab5f5720f1 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 9 Dec 2022 15:56:56 +0800 Subject: [PATCH 01/17] Write the child ownerships to BSC instead of ZK when split bundle --- .../channel/ServiceUnitStateChannelImpl.java | 127 +++++++++++++++--- .../broker/namespace/NamespaceService.java | 4 +- .../channel/ServiceUnitStateChannelTest.java | 13 +- 3 files changed, 121 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 38e8afa50f302..1657eff74f30e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static java.lang.String.format; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; @@ -32,6 +33,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -47,22 +49,32 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; +import org.apache.pulsar.common.naming.BundleSplitOption; +import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -410,8 +422,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.broker())) { - splitServiceUnit(serviceUnit) - .thenCompose(__ -> tombstoneAsync(serviceUnit)) + splitServiceUnit(serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } } @@ -509,25 +520,101 @@ private CompletableFuture closeServiceUnit(String serviceUnit) { }); } - private CompletableFuture splitServiceUnit(String serviceUnit) { - // TODO: after the split we need to write the child ownerships to BSC instead of ZK. + private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnitStateData data) { + // Write the child ownerships to BSC. long startTime = System.nanoTime(); - return pulsar.getNamespaceService() - .splitAndOwnBundle(getNamespaceBundle(serviceUnit), - false, - NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()), - null) - .whenComplete((__, ex) -> { - double splitBundleTime = TimeUnit.NANOSECONDS - .toMillis((System.nanoTime() - startTime)); - if (ex == null) { - log.info("Successfully split {} namespace-bundle in {} ms", - serviceUnit, splitBundleTime); - } else { - log.error("Failed to split {} namespace-bundle in {} ms", - serviceUnit, splitBundleTime, ex); - } - }); + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); + NamespaceBundle bundle = getNamespaceBundle(serviceUnit); + + CompletableFuture completionFuture = new CompletableFuture<>(); + CompletableFuture> updateFuture = new CompletableFuture<>(); + + getSplitBoundary(bundle).thenAccept(splittedBundles -> { + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splittedBundles == null) { + String msg = format("Bundle %s not found under namespace", serviceUnit); + log.warn(msg); + updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); + return; + } + List> futures = new ArrayList<>(); + ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); + for (NamespaceBundle sBundle : splittedBundles.getRight()) { + futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> {})); + } + NamespaceName nsname = bundle.getNamespaceObject(); + FutureUtil.waitForAll(futures).thenRun(() -> { + namespaceService.updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> + namespaceService.updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft())) + .thenRun(() -> { + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + updateFuture.complete(splittedBundles.getRight()); + }).exceptionally(ex1 -> { + String msg = format("failed to update namespace policies [%s], " + + "NamespaceBundle: %s due to %s", + nsname.toString(), bundle.getBundleRange(), ex1.getMessage()); + log.warn(msg); + updateFuture.completeExceptionally( + new BrokerServiceException.ServiceUnitNotReadyException(msg, ex1.getCause())); + return null; + }); + }).exceptionally(e -> { + updateFuture.completeExceptionally(e); + return null; + }); + }).exceptionally(e -> { + updateFuture.completeExceptionally(e); + return null; + }); + + updateFuture.thenAccept(r -> { + // Free the old bundle + tombstoneAsync(serviceUnit).thenRun(() -> { + // Update bundled_topic cache for load-report-generation + pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); + // TODO: Update the load data if needed. + completionFuture.complete(null); + double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); + log.info("Successfully split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime); + }).exceptionally(e -> { + String msg = format( + "Failed to free bundle %s under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); + double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); + log.error("Failed to split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime, e); + return null; + }); + }).exceptionally(ex -> { + // TODO: Retry when have exception. + completionFuture.completeExceptionally(ex); + return null; + }); + return completionFuture; + } + + private CompletableFuture>> getSplitBoundary(NamespaceBundle bundle) { + ServiceConfiguration config = pulsar.getConfig(); + NamespaceService namespaceService = pulsar.getNamespaceService(); + BundleSplitOption bundleSplitOption; + if (config.getDefaultNamespaceBundleSplitAlgorithm() + .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { + Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); + bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(namespaceService, bundle, null, + topicStatsMap, + config.getLoadBalancerNamespaceBundleMaxMsgRate(), + config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), + config.getFlowOrQpsDifferenceThresholdPercentage()); + } else { + bundleSplitOption = new BundleSplitOption(namespaceService, bundle, null); + } + NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = + NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + CompletableFuture> splitBoundary = + nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); + return splitBoundary.thenCompose(splitBoundaries -> pulsar.getNamespaceService().getNamespaceBundleFactory() + .splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)); } public void handleMetadataSessionEvent(SessionEvent e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 84bce75bf5a73..929c1ca4b28c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -950,7 +950,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, * @param nsBundles * @throws Exception */ - private CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName nsname, + public CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName nsname, NamespaceBundles nsBundles) { Objects.requireNonNull(nsname); Objects.requireNonNull(nsBundles); @@ -979,7 +979,7 @@ private CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName * @param nsBundles * @throws Exception */ - private CompletableFuture updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { + public CompletableFuture updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { Objects.requireNonNull(nsname); Objects.requireNonNull(nsBundles); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index a16c2be6612bd..876dcfebb38e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -455,7 +455,18 @@ public void splitTest() throws Exception { waitUntilNewOwner(channel1, bundle, null); waitUntilNewOwner(channel2, bundle, null); - // TODO: assert child bundle ownerships in the channels. + // Assert child bundle ownerships in the channels. + String childBundle1 = "public/default/0x7fffffff_0xffffffff"; + String childBundle2 = "public/default/0x00000000_0x7fffffff"; + + waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1); + waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1); + waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1); + waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle1).get()); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle2).get()); + assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle1).get()); + assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle2).get()); } @Test(priority = 7) From 55d7a6268c1020694ae449d72e6b4a9656f9eec8 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 9 Dec 2022 16:27:58 +0800 Subject: [PATCH 02/17] Retry when split bundle has BadVersion exception --- .../channel/ServiceUnitStateChannelImpl.java | 91 ++++++++++++------- .../broker/namespace/NamespaceService.java | 2 +- .../channel/ServiceUnitStateChannelTest.java | 55 ++++++++--- 3 files changed, 99 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1657eff74f30e..296ce49da8fe2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance.extensions.channel; import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; @@ -33,6 +34,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -76,6 +78,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -526,13 +529,27 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit NamespaceService namespaceService = pulsar.getNamespaceService(); NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); NamespaceBundle bundle = getNamespaceBundle(serviceUnit); - CompletableFuture completionFuture = new CompletableFuture<>(); + final AtomicInteger counter = new AtomicInteger(NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT); + this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data, + counter, startTime, completionFuture); + return completionFuture; + } + + @VisibleForTesting + protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, + NamespaceBundleFactory bundleFactory, + NamespaceBundle bundle, + String serviceUnit, + ServiceUnitStateData data, + AtomicInteger counter, + long startTime, + CompletableFuture completionFuture) { CompletableFuture> updateFuture = new CompletableFuture<>(); - getSplitBoundary(bundle).thenAccept(splittedBundles -> { + getSplitBoundary(bundle).thenAccept(splitBundles -> { // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splittedBundles == null) { + if (splitBundles == null) { String msg = format("Bundle %s not found under namespace", serviceUnit); log.warn(msg); updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); @@ -540,32 +557,28 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit } List> futures = new ArrayList<>(); ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); - for (NamespaceBundle sBundle : splittedBundles.getRight()) { + for (NamespaceBundle sBundle : splitBundles.getRight()) { futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> {})); } NamespaceName nsname = bundle.getNamespaceObject(); - FutureUtil.waitForAll(futures).thenRun(() -> { - namespaceService.updateNamespaceBundles(nsname, splittedBundles.getLeft()).thenCompose(__ -> - namespaceService.updateNamespaceBundlesForPolicies(nsname, splittedBundles.getLeft())) - .thenRun(() -> { - bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - updateFuture.complete(splittedBundles.getRight()); - }).exceptionally(ex1 -> { - String msg = format("failed to update namespace policies [%s], " - + "NamespaceBundle: %s due to %s", - nsname.toString(), bundle.getBundleRange(), ex1.getMessage()); - log.warn(msg); - updateFuture.completeExceptionally( - new BrokerServiceException.ServiceUnitNotReadyException(msg, ex1.getCause())); - return null; - }); - }).exceptionally(e -> { - updateFuture.completeExceptionally(e); - return null; - }); + FutureUtil.waitForAll(futures).thenRun(() -> + namespaceService.updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ -> + namespaceService.updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft())) + .thenRun(() -> { + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + updateFuture.complete(splitBundles.getRight()); + }).exceptionally(ex -> { + log.warn("Failed to update namespace policies [{}], NamespaceBundle: {} due to {}", + nsname, bundle.getBundleRange(), ex.getMessage()); + updateFuture.completeExceptionally(ex); + return null; + })).exceptionally(e -> { + updateFuture.completeExceptionally(e); + return null; + }); }).exceptionally(e -> { updateFuture.completeExceptionally(e); - return null; + return null; }); updateFuture.thenAccept(r -> { @@ -573,27 +586,39 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit tombstoneAsync(serviceUnit).thenRun(() -> { // Update bundled_topic cache for load-report-generation pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); - // TODO: Update the load data if needed. + // TODO: Update the load data immediately if needed. completionFuture.complete(null); double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); - log.info("Successfully split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime); + log.info("Successfully split {} parent namespace-bundle to {} in {} ms", serviceUnit, r, + splitBundleTime); }).exceptionally(e -> { - String msg = format( - "Failed to free bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage()); + String msg = format("Failed to free bundle %s under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), bundle, e.getMessage()); completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); log.error("Failed to split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime, e); return null; }); }).exceptionally(ex -> { - // TODO: Retry when have exception. - completionFuture.completeExceptionally(ex); + // Retry several times on BadVersion + if ((ex.getCause() instanceof MetadataStoreException.BadVersionException) + && (counter.decrementAndGet() >= 0)) { + pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor() + .execute(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, + serviceUnit, data, counter, startTime, completionFuture)), + 100, MILLISECONDS); + } else if (ex instanceof IllegalArgumentException) { + completionFuture.completeExceptionally(ex); + } else { + // Retry enough, or meet other exception + String msg2 = format("Bundle: %s not success update nsBundles, counter %d, reason %s", + bundle.toString(), counter.get(), ex.getMessage()); + log.warn(msg2); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg2)); + } return null; }); - return completionFuture; } - private CompletableFuture>> getSplitBoundary(NamespaceBundle bundle) { ServiceConfiguration config = pulsar.getConfig(); NamespaceService namespaceService = pulsar.getNamespaceService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 929c1ca4b28c7..dd1b83b4b743d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -123,7 +123,7 @@ public class NamespaceService implements AutoCloseable { private final NamespaceBundleFactory bundleFactory; private final String host; - private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7; + public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7; public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor"; public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 876dcfebb38e1..8d17e4a46e942 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -32,9 +32,12 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -54,6 +57,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -62,11 +66,13 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.awaitility.Awaitility; @@ -103,9 +109,9 @@ protected void setup() throws Exception { pulsar1 = pulsar; pulsar2 = startBrokerWithoutAuthorization(getDefaultConf()); - channel1 = new ServiceUnitStateChannelImpl(pulsar1); + channel1 = spy(new ServiceUnitStateChannelImpl(pulsar1)); channel1.start(); - channel2 = new ServiceUnitStateChannelImpl(pulsar2); + channel2 = spy(new ServiceUnitStateChannelImpl(pulsar2)); channel2.start(); lookupServiceAddress1 = (String) FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", true); @@ -440,7 +446,7 @@ public void unloadTestWhenDestBrokerFails() } @Test(priority = 6) - public void splitTest() throws Exception { + public void splitAndRetryTest() throws Exception { channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); @@ -449,12 +455,31 @@ public void splitTest() throws Exception { assertEquals(ownerAddr1, lookupServiceAddress1); assertEquals(ownerAddr2, lookupServiceAddress1); + NamespaceService namespaceService = spy(pulsar1.getNamespaceService()); + CompletableFuture future = new CompletableFuture<>(); + int badVersionExceptionCount = 3; + AtomicInteger count = new AtomicInteger(badVersionExceptionCount); + future.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion")); + doAnswer(invocationOnMock -> { + if (count.decrementAndGet() > 0) { + return future; + } + // Call real method + reset(namespaceService); + return future; + }).when(namespaceService).updateNamespaceBundles(any(), any()); + doReturn(namespaceService).when(pulsar1).getNamespaceService(); + Split split = new Split(bundle, ownerAddr1, new HashMap<>()); channel1.publishSplitEventAsync(split); waitUntilNewOwner(channel1, bundle, null); waitUntilNewOwner(channel2, bundle, null); + // Verify the retry count + verify(((ServiceUnitStateChannelImpl) channel1), times(badVersionExceptionCount + 1)) + .splitServiceUnitOnceAndRetry(any(), any(), any(), any(), any(), any(), anyLong(), any()); + // Assert child bundle ownerships in the channels. String childBundle1 = "public/default/0x7fffffff_0xffffffff"; String childBundle2 = "public/default/0x00000000_0x7fffffff"; @@ -580,8 +605,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -605,8 +630,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -621,8 +646,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -639,8 +664,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -657,8 +682,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(6, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -682,8 +707,8 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); + assertEquals(6, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); From 4a0e2e46e8f2b96ab74b270152324e4b7233c787 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 9 Dec 2022 22:43:27 +0800 Subject: [PATCH 03/17] Rename the msg field --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 296ce49da8fe2..991680179d54d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -611,10 +611,10 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, completionFuture.completeExceptionally(ex); } else { // Retry enough, or meet other exception - String msg2 = format("Bundle: %s not success update nsBundles, counter %d, reason %s", + String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s", bundle.toString(), counter.get(), ex.getMessage()); - log.warn(msg2); - completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg2)); + log.warn(msg); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); } return null; }); From c9f132f703ac3edab31d553e068fa68187d1912a Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Sat, 10 Dec 2022 11:17:28 +0800 Subject: [PATCH 04/17] Reduce unused logs --- .../channel/ServiceUnitStateChannelImpl.java | 13 +++++-------- .../channel/ServiceUnitStateChannelTest.java | 2 +- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 991680179d54d..0309124343824 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -530,7 +530,7 @@ private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnit NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); NamespaceBundle bundle = getNamespaceBundle(serviceUnit); CompletableFuture completionFuture = new CompletableFuture<>(); - final AtomicInteger counter = new AtomicInteger(NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT); + final AtomicInteger counter = new AtomicInteger(0); this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data, counter, startTime, completionFuture); return completionFuture; @@ -551,7 +551,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. if (splitBundles == null) { String msg = format("Bundle %s not found under namespace", serviceUnit); - log.warn(msg); updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); return; } @@ -592,17 +591,16 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, log.info("Successfully split {} parent namespace-bundle to {} in {} ms", serviceUnit, r, splitBundleTime); }).exceptionally(e -> { - String msg = format("Failed to free bundle %s under namespace [%s] with error %s", - bundle.getNamespaceObject().toString(), bundle, e.getMessage()); - completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); - log.error("Failed to split {} namespace-bundle in {} ms", serviceUnit, splitBundleTime, e); + String msg = format("Failed to free bundle %s in %s ms, under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), splitBundleTime, bundle, e.getMessage()); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); return null; }); }).exceptionally(ex -> { // Retry several times on BadVersion if ((ex.getCause() instanceof MetadataStoreException.BadVersionException) - && (counter.decrementAndGet() >= 0)) { + && (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) { pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor() .execute(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data, counter, startTime, completionFuture)), @@ -613,7 +611,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, // Retry enough, or meet other exception String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s", bundle.toString(), counter.get(), ex.getMessage()); - log.warn(msg); completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); } return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 8d17e4a46e942..2b57188a780fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -464,7 +464,7 @@ public void splitAndRetryTest() throws Exception { if (count.decrementAndGet() > 0) { return future; } - // Call real method + // Call the real method reset(namespaceService); return future; }).when(namespaceService).updateNamespaceBundles(any(), any()); From ca2878eb72df049756b5b5708a11fee9ea2a3d77 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 14 Dec 2022 17:43:02 +0800 Subject: [PATCH 05/17] Address reviewer's comments --- .../channel/ServiceUnitStateChannelImpl.java | 62 ++++++------------- .../broker/namespace/NamespaceService.java | 38 ++++++++++++ 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 0309124343824..e4f2171a5714c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -51,10 +51,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; @@ -66,16 +64,12 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.common.naming.BundleSplitOption; -import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; -import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -426,7 +420,14 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.broker())) { splitServiceUnit(serviceUnit, data) - .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + .whenComplete((__, e) -> { + if (e != null) { + // When has exception, change the bundle state back to Splitting -> Owned . + pubAsync(serviceUnit, new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker())); + log(e, serviceUnit, data, null); + } + + }); } } @@ -547,31 +548,28 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, CompletableFuture completionFuture) { CompletableFuture> updateFuture = new CompletableFuture<>(); - getSplitBoundary(bundle).thenAccept(splitBundles -> { + pulsar.getNamespaceService().getSplitBoundary(bundle, null).thenAccept(splitBundlesPair -> { // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. - if (splitBundles == null) { + if (splitBundlesPair == null) { String msg = format("Bundle %s not found under namespace", serviceUnit); updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); return; } List> futures = new ArrayList<>(); - ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); - for (NamespaceBundle sBundle : splitBundles.getRight()) { + ServiceUnitStateData next = new ServiceUnitStateData(Assigned, data.broker()); + NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); + List splitBundles = splitBundlesPair.getRight(); + for (NamespaceBundle sBundle : splitBundles) { futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> {})); } NamespaceName nsname = bundle.getNamespaceObject(); - FutureUtil.waitForAll(futures).thenRun(() -> - namespaceService.updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ -> - namespaceService.updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft())) + FutureUtil.waitForAll(futures) + .thenCompose(__ -> namespaceService.updateNamespaceBundles(nsname, targetNsBundle)) + .thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle)) .thenRun(() -> { bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); - updateFuture.complete(splitBundles.getRight()); - }).exceptionally(ex -> { - log.warn("Failed to update namespace policies [{}], NamespaceBundle: {} due to {}", - nsname, bundle.getBundleRange(), ex.getMessage()); - updateFuture.completeExceptionally(ex); - return null; - })).exceptionally(e -> { + updateFuture.complete(splitBundles); + }).exceptionally(e -> { updateFuture.completeExceptionally(e); return null; }); @@ -616,28 +614,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, return null; }); } - private CompletableFuture>> getSplitBoundary(NamespaceBundle bundle) { - ServiceConfiguration config = pulsar.getConfig(); - NamespaceService namespaceService = pulsar.getNamespaceService(); - BundleSplitOption bundleSplitOption; - if (config.getDefaultNamespaceBundleSplitAlgorithm() - .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { - Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); - bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(namespaceService, bundle, null, - topicStatsMap, - config.getLoadBalancerNamespaceBundleMaxMsgRate(), - config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), - config.getFlowOrQpsDifferenceThresholdPercentage()); - } else { - bundleSplitOption = new BundleSplitOption(namespaceService, bundle, null); - } - NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = - NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); - CompletableFuture> splitBoundary = - nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); - return splitBoundary.thenCompose(splitBoundaries -> pulsar.getNamespaceService().getNamespaceBundleFactory() - .splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)); - } public void handleMetadataSessionEvent(SessionEvent e) { if (e == SessionReestablished || e == SessionLost) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index dd1b83b4b743d..576af752a1b04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -942,6 +942,44 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, }); } + /** + * Get the split boundary's. + * + * @param bundle The bundle to split. + * @param boundaries The specified positions, + * use for {@link org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}. + * @return A pair, left is target namespace bundle, right is split bundles. + */ + public CompletableFuture>> getSplitBoundary( + NamespaceBundle bundle, List boundaries) { + ServiceConfiguration config = pulsar.getConfig(); + BundleSplitOption bundleSplitOption; + if (config.getDefaultNamespaceBundleSplitAlgorithm() + .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { + Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); + bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, + topicStatsMap, + config.getLoadBalancerNamespaceBundleMaxMsgRate(), + config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), + config.getFlowOrQpsDifferenceThresholdPercentage()); + } else { + bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); + } + NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = + NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + CompletableFuture> splitBoundary = + nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); + return splitBoundary.thenCompose(splitBoundaries -> { + if (splitBoundaries == null || splitBoundaries.size() == 0) { + LOG.info("[{}] No valid boundary found in {} to split bundle {}", + bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange()); + return CompletableFuture.completedFuture(null); + } + return pulsar.getNamespaceService().getNamespaceBundleFactory() + .splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries); + }); + } + /** * Update new bundle-range to admin/policies/namespace. * Update may fail because of concurrent write to Zookeeper. From e596489a7b0b3e53b696dd4b405fc8f4723b7768 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 14 Dec 2022 18:34:36 +0800 Subject: [PATCH 06/17] Cleanup the new bundle when has exception. --- .../channel/ServiceUnitStateChannelImpl.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index e4f2171a5714c..b74b57b2369fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -559,8 +560,11 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, ServiceUnitStateData next = new ServiceUnitStateData(Assigned, data.broker()); NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); List splitBundles = splitBundlesPair.getRight(); + List successPublishedBundles = new CopyOnWriteArrayList<>(); for (NamespaceBundle sBundle : splitBundles) { - futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> {})); + futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> { + successPublishedBundles.add(sBundle); + })); } NamespaceName nsname = bundle.getNamespaceObject(); FutureUtil.waitForAll(futures) @@ -570,7 +574,18 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); updateFuture.complete(splitBundles); }).exceptionally(e -> { - updateFuture.completeExceptionally(e); + // Clean the new bundle when has exception. + List> futureList = new ArrayList<>(); + for (NamespaceBundle sBundle : successPublishedBundles) { + futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {})); + } + FutureUtil.waitForAll(futureList) + .whenComplete((__, ex) -> { + if (ex != null) { + log.warn("Clean new bundles failed,", ex); + } + updateFuture.completeExceptionally(e); + }); return null; }); }).exceptionally(e -> { From 732fd42a235b4d58a18b8fef88091df38a66ad10 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 14 Dec 2022 18:50:37 +0800 Subject: [PATCH 07/17] Clean the child bundles when test completed --- .../channel/ServiceUnitStateChannelTest.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 2b57188a780fe..81cef85904df8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -492,6 +492,11 @@ public void splitAndRetryTest() throws Exception { assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle2).get()); assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle1).get()); assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle2).get()); + + cleanTableView(channel1, childBundle1); + cleanTableView(channel2, childBundle1); + cleanTableView(channel1, childBundle2); + cleanTableView(channel2, childBundle2); } @Test(priority = 7) @@ -605,7 +610,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); @@ -630,7 +635,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); @@ -646,7 +651,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); @@ -664,7 +669,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); @@ -682,7 +687,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(6, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); @@ -707,7 +712,7 @@ public void handleBrokerDeletionEventTest() assertEquals(0, followerCleanupJobs.size()); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(6, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); + assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); From 19e322cd01f5ace7be371ce262899979383ecdff Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 16 Dec 2022 13:15:01 +0800 Subject: [PATCH 08/17] Use owned state --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index b74b57b2369fc..20de5662773c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -557,7 +557,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, return; } List> futures = new ArrayList<>(); - ServiceUnitStateData next = new ServiceUnitStateData(Assigned, data.broker()); + ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); List splitBundles = splitBundlesPair.getRight(); List successPublishedBundles = new CopyOnWriteArrayList<>(); From 922186f5a5b45c4016a33e4e78e2ffe85856b0e5 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 16 Dec 2022 16:11:21 +0800 Subject: [PATCH 09/17] Use scheduler direct --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 20de5662773c5..8af1730507c86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -614,10 +614,8 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, // Retry several times on BadVersion if ((ex.getCause() instanceof MetadataStoreException.BadVersionException) && (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) { - pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor() - .execute(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, - serviceUnit, data, counter, startTime, completionFuture)), - 100, MILLISECONDS); + pulsar.getExecutor().schedule(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, + bundle, serviceUnit, data, counter, startTime, completionFuture), 100, MILLISECONDS); } else if (ex instanceof IllegalArgumentException) { completionFuture.completeExceptionally(ex); } else { From c99681d39cb23e7492c70e3a5719991fa85236e6 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 4 Jan 2023 11:24:00 +0800 Subject: [PATCH 10/17] Revert handle exception --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 8af1730507c86..621f64ec24d22 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -421,14 +421,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.broker())) { splitServiceUnit(serviceUnit, data) - .whenComplete((__, e) -> { - if (e != null) { - // When has exception, change the bundle state back to Splitting -> Owned . - pubAsync(serviceUnit, new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker())); - log(e, serviceUnit, data, null); - } - - }); + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } } From 568a39b357b3a5dd929edfb987d19586140e10d6 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 31 Jan 2023 17:27:46 +0800 Subject: [PATCH 11/17] Merge master into current branch --- .../channel/ServiceUnitStateChannelImpl.java | 1 + .../channel/ServiceUnitStateChannelTest.java | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 621f64ec24d22..30884415272b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 81cef85904df8..d5839e3880abe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -611,7 +611,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -636,7 +636,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -652,7 +652,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -670,7 +670,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -688,7 +688,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -713,7 +713,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); From 38c019b9a9795d235ede921b9a014db9abd5e28d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 3 Feb 2023 16:06:51 +0800 Subject: [PATCH 12/17] Merge master to current branch --- .../channel/ServiceUnitStateChannelTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 8334b67441f6a..fb5519b768a39 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -478,7 +478,7 @@ public void splitAndRetryTest() throws Exception { }).when(namespaceService).updateNamespaceBundles(any(), any()); doReturn(namespaceService).when(pulsar1).getNamespaceService(); - Split split = new Split(bundle, ownerAddr1, new HashMap<>()); + Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>()); channel1.publishSplitEventAsync(split); waitUntilNewOwner(channel1, bundle, null); @@ -496,10 +496,10 @@ public void splitAndRetryTest() throws Exception { waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1); waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1); waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1); - assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle1).get()); - assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(childBundle2).get()); - assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle1).get()); - assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(childBundle2).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle1).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle2).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle1).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle2).get()); cleanTableView(channel1, childBundle1); cleanTableView(channel2, childBundle1); From 563da58ce0608abc01ac763bdf56e589f2e3c25d Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 6 Feb 2023 18:33:57 +0800 Subject: [PATCH 13/17] Refactor getSplitBoundary method --- .../broker/admin/impl/NamespacesBase.java | 16 +---- .../broker/namespace/NamespaceService.java | 58 ++++++++++--------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9b93752d5e454..c40444859bb69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1028,7 +1028,9 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, - getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)); + pulsar().getNamespaceService() + .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), + splitBoundaries)); }); } @@ -1109,18 +1111,6 @@ private CompletableFuture findHotBundleAsync(NamespaceName name .getBundleWithHighestThroughputAsync(namespaceName); } - private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { - NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); - if (algorithm == null) { - algorithm = NamespaceBundleSplitAlgorithm.of( - pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); - } - if (algorithm == null) { - algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; - } - return algorithm; - } - protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { validateSuperUserAccess(); log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 82ac19d0b3af6..245c3f896af1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -828,18 +828,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, CompletableFuture completionFuture, NamespaceBundleSplitAlgorithm splitAlgorithm, List boundaries) { - BundleSplitOption bundleSplitOption; - if (config.getDefaultNamespaceBundleSplitAlgorithm() - .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { - Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); - bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, - topicStatsMap, - config.getLoadBalancerNamespaceBundleMaxMsgRate(), - config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), - config.getFlowOrQpsDifferenceThresholdPercentage()); - } else { - bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); - } + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); @@ -967,21 +956,9 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, */ public CompletableFuture>> getSplitBoundary( NamespaceBundle bundle, List boundaries) { - ServiceConfiguration config = pulsar.getConfig(); - BundleSplitOption bundleSplitOption; - if (config.getDefaultNamespaceBundleSplitAlgorithm() - .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { - Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); - bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, - topicStatsMap, - config.getLoadBalancerNamespaceBundleMaxMsgRate(), - config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), - config.getFlowOrQpsDifferenceThresholdPercentage()); - } else { - bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); - } + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = - NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm()); CompletableFuture> splitBoundary = nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); return splitBoundary.thenCompose(splitBoundaries -> { @@ -995,6 +972,35 @@ public CompletableFuture>> getSplit }); } + private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle, + List boundaries, + ServiceConfiguration config) { + BundleSplitOption bundleSplitOption; + if (config.getDefaultNamespaceBundleSplitAlgorithm() + .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { + Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); + bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, + topicStatsMap, + config.getLoadBalancerNamespaceBundleMaxMsgRate(), + config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), + config.getFlowOrQpsDifferenceThresholdPercentage()); + } else { + bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); + } + return bundleSplitOption; + } + + public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { + NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + } + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; + } + return algorithm; + } + /** * Update new bundle-range to admin/policies/namespace. * Update may fail because of concurrent write to Zookeeper. From a13d4b3ddf2065d43fae9234b0ac82e17acec6e0 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 6 Feb 2023 18:57:42 +0800 Subject: [PATCH 14/17] Fix test --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 1 + .../extensions/channel/ServiceUnitStateChannelTest.java | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ec9e3a30782f7..8538afeb13cba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -77,6 +77,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index c19cd9d44c41f..327afa3cb8891 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -517,9 +517,8 @@ public void splitAndRetryTest() throws Exception { waitUntilNewOwner(channel1, bundle, null); waitUntilNewOwner(channel2, bundle, null); - // TODO: assert child bundle ownerships in the channels. - validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0); + validateHandlerCounters(channel2, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); // Verify the retry count From 3a7b073233ca7f235d41dd78dcd6cf17d81e291f Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 8 Feb 2023 22:40:00 +0800 Subject: [PATCH 15/17] Use unwrap completion exception when has exception --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 8538afeb13cba..9a3a7c3533ae8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -718,16 +718,17 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, }); }).exceptionally(ex -> { // Retry several times on BadVersion - if ((ex.getCause() instanceof MetadataStoreException.BadVersionException) + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if ((throwable instanceof MetadataStoreException.BadVersionException) && (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) { pulsar.getExecutor().schedule(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data, counter, startTime, completionFuture), 100, MILLISECONDS); - } else if (ex instanceof IllegalArgumentException) { - completionFuture.completeExceptionally(ex); + } else if (throwable instanceof IllegalArgumentException) { + completionFuture.completeExceptionally(throwable); } else { // Retry enough, or meet other exception String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s", - bundle.toString(), counter.get(), ex.getMessage()); + bundle.toString(), counter.get(), throwable.getMessage()); completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); } return null; From 137f893b9ecad087516a935dd7bafba362050172 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 13 Feb 2023 16:22:44 +0800 Subject: [PATCH 16/17] Address review's comment --- .../channel/ServiceUnitStateChannelImpl.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 4daa81625a4c4..bb8d0bdda320b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -39,6 +39,7 @@ import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -46,7 +47,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; @@ -78,8 +78,8 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; @@ -665,15 +665,14 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); return; } - List> futures = new ArrayList<>(); ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); List splitBundles = splitBundlesPair.getRight(); - List successPublishedBundles = new CopyOnWriteArrayList<>(); + List successPublishedBundles = + Collections.synchronizedList(new ArrayList<>(splitBundles.size())); + List> futures = new ArrayList<>(splitBundles.size()); for (NamespaceBundle sBundle : splitBundles) { - futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> { - successPublishedBundles.add(sBundle); - })); + futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> successPublishedBundles.add(sBundle))); } NamespaceName nsname = bundle.getNamespaceObject(); FutureUtil.waitForAll(futures) From 5d440f401f376b018c85a1577b4277d7cbbde180 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 13 Feb 2023 16:52:40 +0800 Subject: [PATCH 17/17] Use unmodifiable list --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index bb8d0bdda320b..d10138bda6805 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -667,7 +667,7 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, } ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); - List splitBundles = splitBundlesPair.getRight(); + List splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight()); List successPublishedBundles = Collections.synchronizedList(new ArrayList<>(splitBundles.size())); List> futures = new ArrayList<>(splitBundles.size());