Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url #18663

Merged
merged 14 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -56,6 +58,8 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -878,6 +882,53 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
}
}

private void validateLeaderBroker() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to make this async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make it async, the result could be indeterministic. If the bundle-broker mapping is not set by the time loadmanager loads the bundle, it will unload the bundle onto wrong broker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's dependent and we don't have to make it async here for this admin-api.

if (!this.isLeaderBroker()) {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerUrl = leaderBroker.getServiceUrl();
CompletableFuture<LookupResult> result = pulsar().getNamespaceService()
.createLookupResult(leaderBrokerUrl, false, null);
try {
LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
: lookupResult.getLookupData().getHttpUrl();
if (redirectUrl == null) {
log.error("Redirected broker's service url is not configured");
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Redirected broker's service url is not configured.");
}
URL url = new URL(redirectUrl);
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
.port(url.getPort())
.replaceQueryParam("authoritative",
false).build();

// Redirect
if (log.isDebugEnabled()) {
log.debug("Redirecting the request call to leader - {}", redirect);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
} catch (MalformedURLException exception) {
log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
throw new RestException(exception);
} catch (ExecutionException | InterruptedException exception) {
log.error("Leader broker not found - {}", leaderBrokerUrl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException may mean many things,
We should unwrap the cause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

throw new RestException(exception.getCause());
} catch (TimeoutException exception) {
log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
throw new RestException(exception);
}
}
}

public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
if (StringUtils.isBlank(destinationBroker)) {
return;
}
validateLeaderBroker();
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
}

public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception {

CompletableFuture<Set<String>> getAvailableBrokersAsync();

String setNamespaceBundleAffinity(String bundle, String broker);

void stop() throws PulsarServerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,6 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
* @return bundle data
*/
BundleData getBundleDataOrDefault(String bundle);

String setNamespaceBundleAffinity(String bundle, String broker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
Expand All @@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager {
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
private Map<String, String> bundleBrokerAffinityMap;

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -142,4 +147,12 @@ public void stop() throws PulsarServerException {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {

private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
private Map<String, String> bundleBrokerAffinityMap;

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand All @@ -215,7 +216,7 @@ public ModularLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();

this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -1212,4 +1213,13 @@ public List<Metrics> getLoadBalancingMetrics() {

return metricsCollection;
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -65,13 +66,13 @@ public LoadManagerReport generateLoadReport() {

@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null);
if (!StringUtils.isBlank(affinityBroker)) {
return Optional.of(buildBrokerResourceUnit(affinityBroker));
}
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
return leastLoadedBroker.map(s -> {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
return leastLoadedBroker.map(this::buildBrokerResourceUnit);
}

private String getBrokerWebServiceUrl(String broker) {
Expand Down Expand Up @@ -146,4 +147,16 @@ public Set<String> getAvailableBrokers() throws Exception {
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return loadManager.getAvailableBrokersAsync();
}

private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more blank here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

String webServiceUrl = getBrokerWebServiceUrl(broker);
String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
return loadManager.setNamespaceBundleAffinity(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification

private volatile Future<?> updateRankingHandle;

private Map<String, String> bundleBrokerAffinityMap;

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -251,6 +255,7 @@ public Long load(String key) throws Exception {
}
});
this.pulsar = pulsar;
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

public SimpleLoadManagerImpl(PulsarService pulsar) {
Expand Down Expand Up @@ -1443,6 +1448,15 @@ public void doNamespaceBundleSplit() throws Exception {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}

@Override
public void stop() throws PulsarServerException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -692,6 +692,7 @@ private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnit
String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);

if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
newAuthoritative).build();

newAuthoritative).replaceQueryParam("destinationBroker",
null).build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ public void testNamespacesApiRedirects() throws Exception {
doReturn(uri).when(uriInfo).getRequestUri();

namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false);
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null);
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
Expand Down Expand Up @@ -1053,7 +1053,7 @@ public void testUnloadNamespaceWithBundles() throws Exception {
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);
namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
false);
false, null);
verify(response, timeout(5000).times(1)).resume(any(RestException.class));

// cleanup
Expand Down
Loading