Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Write the child ownership to ServiceUnitStateChannel instead of ZK when handling bundle split #18858

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,9 @@ protected CompletableFuture<Void> internalSplitNamespaceBundleAsync(String bundl
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries));
pulsar().getNamespaceService()
.getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName),
splitBoundaries));
});
}

Expand Down Expand Up @@ -1109,18 +1111,6 @@ private CompletableFuture<NamespaceBundle> findHotBundleAsync(NamespaceName name
.getBundleWithHighestThroughputAsync(namespaceName);
}

private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(
pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}

protected void internalSetPublishRate(PublishRate maxPublishMessageRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
Expand All @@ -35,7 +37,9 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -48,6 +52,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -60,18 +65,23 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
Expand Down Expand Up @@ -523,8 +533,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {

private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.broker())) {
splitServiceUnit(serviceUnit)
.thenCompose(__ -> tombstoneAsync(serviceUnit))
splitServiceUnit(serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
}
}
Expand Down Expand Up @@ -625,25 +634,107 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
});
}

private CompletableFuture<Void> splitServiceUnit(String serviceUnit) {
// TODO: after the split we need to write the child ownerships to BSC instead of ZK.
private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnitStateData data) {
// Write the child ownerships to BSC.
long startTime = System.nanoTime();
return pulsar.getNamespaceService()
.splitAndOwnBundle(getNamespaceBundle(serviceUnit),
false,
NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()),
null)
.whenComplete((__, ex) -> {
double splitBundleTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
if (ex == null) {
log.info("Successfully split {} namespace-bundle in {} ms",
serviceUnit, splitBundleTime);
} else {
log.error("Failed to split {} namespace-bundle in {} ms",
serviceUnit, splitBundleTime, ex);
}
});
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(0);
this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data,
counter, startTime, completionFuture);
return completionFuture;
}

@VisibleForTesting
protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService,
NamespaceBundleFactory bundleFactory,
NamespaceBundle bundle,
String serviceUnit,
ServiceUnitStateData data,
AtomicInteger counter,
long startTime,
CompletableFuture<Void> completionFuture) {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();

pulsar.getNamespaceService().getSplitBoundary(bundle, null).thenAccept(splitBundlesPair -> {
// Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper.
if (splitBundlesPair == null) {
String msg = format("Bundle %s not found under namespace", serviceUnit);
updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
return;
}
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker());
NamespaceBundles targetNsBundle = splitBundlesPair.getLeft();
List<NamespaceBundle> splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight());
List<NamespaceBundle> successPublishedBundles =
Collections.synchronizedList(new ArrayList<>(splitBundles.size()));
List<CompletableFuture<Void>> futures = new ArrayList<>(splitBundles.size());
for (NamespaceBundle sBundle : splitBundles) {
futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> successPublishedBundles.add(sBundle)));
}
NamespaceName nsname = bundle.getNamespaceObject();
FutureUtil.waitForAll(futures)
.thenCompose(__ -> namespaceService.updateNamespaceBundles(nsname, targetNsBundle))
.thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle))
.thenRun(() -> {
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
updateFuture.complete(splitBundles);
}).exceptionally(e -> {
// Clean the new bundle when has exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about logging the error here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the updateFuture.completeExceptionally(e); is called, the log will print in this place. https://github.com/apache/pulsar/pull/18858/files#diff-191ed1df4f5804c8fd6cdaf909cca01d071a110c3e701bdc98352f99e7a92e8eR534

List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (NamespaceBundle sBundle : successPublishedBundles) {
futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {}));
}
FutureUtil.waitForAll(futureList)
.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("Clean new bundles failed,", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the consequences of this failure ?

can you please point me out to where we handle this inconsistent state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two cases to entry this failure.

  1. When pubAsync has an exception.
  2. When updateNamespaceBundles, updateNamespaceBundlesForPolicies has exception.

In these two cases, we won’t write the new bundle to ZK, so when the client tries to lookup, it will still find the old bundle, but some of the new bundle will be left in table-view, and when the broker is down, the unused bundle will be deleted in table-view.

}
updateFuture.completeExceptionally(e);
});
return null;
});
}).exceptionally(e -> {
updateFuture.completeExceptionally(e);
return null;
});

updateFuture.thenAccept(r -> {
// Free the old bundle
tombstoneAsync(serviceUnit).thenRun(() -> {
// Update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
// TODO: Update the load data immediately if needed.
completionFuture.complete(null);
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
log.info("Successfully split {} parent namespace-bundle to {} in {} ms", serviceUnit, r,
splitBundleTime);
}).exceptionally(e -> {
double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime));
String msg = format("Failed to free bundle %s in %s ms, under namespace [%s] with error %s",
bundle.getNamespaceObject().toString(), splitBundleTime, bundle, e.getMessage());
completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
return null;
});
}).exceptionally(ex -> {
// Retry several times on BadVersion
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
if ((throwable instanceof MetadataStoreException.BadVersionException)
&& (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) {
pulsar.getExecutor().schedule(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory,
bundle, serviceUnit, data, counter, startTime, completionFuture), 100, MILLISECONDS);
} else if (throwable instanceof IllegalArgumentException) {
completionFuture.completeExceptionally(throwable);
} else {
// Retry enough, or meet other exception
String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s",
bundle.toString(), counter.get(), throwable.getMessage());
completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
}
return null;
});
}

public void handleMetadataSessionEvent(SessionEvent e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public class NamespaceService implements AutoCloseable {
private final NamespaceBundleFactory bundleFactory;
private final String host;

private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
Expand Down Expand Up @@ -828,18 +828,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);

splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -957,6 +946,61 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
});
}

/**
* Get the split boundary's.
*
* @param bundle The bundle to split.
* @param boundaries The specified positions,
* use for {@link org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}.
* @return A pair, left is target namespace bundle, right is split bundles.
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
NamespaceBundle bundle, List<Long> boundaries) {
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm =
getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm());
CompletableFuture<List<Long>> splitBoundary =
nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
return splitBoundary.thenCompose(splitBoundaries -> {
if (splitBoundaries == null || splitBoundaries.size() == 0) {
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
return CompletableFuture.completedFuture(null);
}
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries);
});
}

private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
List<Long> boundaries,
ServiceConfiguration config) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
return bundleSplitOption;
}

public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}

/**
* Update new bundle-range to admin/policies/namespace.
* Update may fail because of concurrent write to Zookeeper.
Expand All @@ -965,7 +1009,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
* @param nsBundles
* @throws Exception
*/
private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);
Expand Down Expand Up @@ -994,7 +1038,7 @@ private CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName
* @param nsBundles
* @throws Exception
*/
private CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);

Expand Down
Loading