Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Handle get owned namespaces admin API in ExtensibleLoadManager #20552

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -37,16 +38,20 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand Down Expand Up @@ -170,7 +175,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private final SplitCounter splitCounter = new SplitCounter();

// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference();
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
// record split metrics
private final AtomicReference<List<Metrics>> splitMetrics = new AtomicReference<>();

Expand All @@ -180,6 +185,24 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
.build();
private final CountDownLatch initWaiter = new CountDownLatch(1);

/**
* Get all the bundles that are owned by this broker.
*/
public Set<NamespaceBundle> getOwnedServiceUnits() {
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
String brokerId = brokerRegistry.getBrokerId();
return entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())
&& stateData.dstBroker().equals(brokerId);
}).map(entry -> {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
}

public enum Role {
Leader,
Follower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
stateChangeListeners.notify(serviceUnit, data, null);
if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
pulsar.getNamespaceService().onNamespaceBundleOwned(getNamespaceBundle(serviceUnit));
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
closeServiceUnit(serviceUnit);
Expand Down Expand Up @@ -803,12 +804,6 @@ private boolean isTargetBroker(String broker) {
return broker.equals(lookupServiceAddress);
}

private NamespaceBundle getNamespaceBundle(String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
Expand All @@ -829,7 +824,7 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
true,
Expand Down Expand Up @@ -860,7 +855,7 @@ private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnit
long startTime = System.nanoTime();
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
CompletableFuture<Void> completionFuture = new CompletableFuture<>();
Map<String, Optional<String>> bundleToDestBroker = data.splitServiceUnitToDestBroker();
List<Long> boundaries = null;
Expand Down Expand Up @@ -1275,7 +1270,8 @@ private synchronized void doCleanup(String broker) {

private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(getNamespaceBundle(serviceUnit), Set.of(inactiveBroker))
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit), Set.of(inactiveBroker))
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,4 +711,10 @@ public static void refreshBrokerToFailureDomainMap(PulsarService pulsar,
LOG.warn("Failed to get domain-list for cluster {}", e.getMessage());
}
}

public static NamespaceBundle getNamespaceBundle(PulsarService pulsar, String bundle) {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
return pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -757,21 +757,42 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}

// unload namespace bundle
return unloadNamespaceBundle(bundle, config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
return unloadNamespaceBundle(bundle, destinationBroker,
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout, TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, timeout, timeoutUnit, true);
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout,
timeoutUnit, closeWithoutWaitingClientDisconnect);
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, long timeout,
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
if (ob == null) {
Expand All @@ -790,24 +811,34 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject()))));
return CompletableFuture.completedFuture(statusMap);
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(),
bundle -> getNamespaceOwnershipStatus(bundle,
bundle -> getNamespaceOwnershipStatus(bundle.isActive(),
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceBundle().getNamespaceObject()))
))
);
});
}

private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive,
NamespaceIsolationPolicy nsIsolationPolicy) {
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
nsObj.isActive());
isActive);
if (nsIsolationPolicy == null) {
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
return nsOwnedStatus;
Expand Down Expand Up @@ -1103,6 +1134,10 @@ public OwnershipCache getOwnershipCache() {
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, having all these references to a specific implementation (ExtensibleLoadManagerImpl) is a code smell.

We should rely on object oriented programming principals.
I am not going to block this patch, but I think that we are accumulating some tech debt that we will have to pay later

Copy link
Member Author

@Demogorgon314 Demogorgon314 Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we must do some refactoring in the feature... We might need to do some abstract for NamespaceService.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the new vs old LM logic variations in NamespaceService are more than what we originally estimated. Yes, we can define NamespaceServiceExtension extends NamespaceService.

If the community decides to only maintain the extension logic in the future(I assume this wont happen in the near future), I think we can clean the old LMlogic in NamespaceService too.

ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -568,7 +570,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);

NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get("test")).get();
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
Expand Down Expand Up @@ -964,10 +966,10 @@ public void testDisableBroker() throws Exception {
var pulsar3 = additionalPulsarTestContext.getPulsarService();
ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true));
String topic = "persistent://public/default/test";
String topic = "persistent://" + defaultTestNamespace +"/test";

String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
TopicName topicName = TopicName.get("test");
TopicName topicName = TopicName.get(topic);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!pulsar3.getBrokerServiceUrl().equals(lookupResult1)) {
admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(),
Expand Down Expand Up @@ -1035,6 +1037,52 @@ public void testListTopic() throws Exception {
admin.namespaces().deleteNamespace(namespace, true);
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws PulsarAdminException {
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
assertTrue(ownedServiceUnitsByPulsar1.isEmpty());
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.isEmpty());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
assertTrue(ownedNamespacesByPulsar1.isEmpty());
assertTrue(ownedNamespacesByPulsar2.isEmpty());

String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join();
CompletableFuture<Optional<BrokerLookupData>> owner = primaryLoadManager.assign(Optional.empty(), bundle);
assertFalse(owner.join().isEmpty());

BrokerLookupData brokerLookupData = owner.join().get();
if (brokerLookupData.getWebServiceUrl().equals(pulsar1.getWebServiceAddress())) {
assertOwnedServiceUnits(pulsar1, primaryLoadManager, bundle);
} else {
assertOwnedServiceUnits(pulsar2, secondaryLoadManager, bundle);
}
}

private void assertOwnedServiceUnits(
PulsarService pulsar,
ExtensibleLoadManagerImpl extensibleLoadManager,
NamespaceBundle bundle) throws PulsarAdminException {
Awaitility.await().untilAsserted(() -> {
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar.getLookupServiceAddress());
assertTrue(ownedNamespaces.containsKey(bundle.toString()));
NamespaceOwnershipStatus status = ownedNamespaces.get(bundle.toString());
assertTrue(status.is_active);
assertFalse(status.is_controlled);
assertEquals(status.broker_assignment, BrokerAssignment.shared);
}

private static abstract class MockBrokerFilter implements BrokerFilter {

@Override
Expand Down