-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url #18663
Changes from 13 commits
10a866b
0a7b068
e43fc50
1d2a971
94def53
ef8013a
ab7130b
d660951
a75ff44
09d4f76
f2b5759
fc9be0a
e86bdb1
6f94be4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,9 @@ | |
import java.util.SortedSet; | ||
import java.util.TreeSet; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
@@ -56,6 +58,8 @@ | |
import org.apache.pulsar.broker.PulsarServerException; | ||
import org.apache.pulsar.broker.admin.AdminResource; | ||
import org.apache.pulsar.broker.authorization.AuthorizationService; | ||
import org.apache.pulsar.broker.loadbalance.LeaderBroker; | ||
import org.apache.pulsar.broker.lookup.LookupResult; | ||
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; | ||
import org.apache.pulsar.broker.service.Subscription; | ||
import org.apache.pulsar.broker.service.Topic; | ||
|
@@ -878,6 +882,53 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { | |
} | ||
} | ||
|
||
private void validateLeaderBroker() { | ||
if (!this.isLeaderBroker()) { | ||
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); | ||
String leaderBrokerUrl = leaderBroker.getServiceUrl(); | ||
CompletableFuture<LookupResult> result = pulsar().getNamespaceService() | ||
.createLookupResult(leaderBrokerUrl, false, null); | ||
try { | ||
LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS); | ||
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() | ||
: lookupResult.getLookupData().getHttpUrl(); | ||
if (redirectUrl == null) { | ||
log.error("Redirected broker's service url is not configured"); | ||
throw new RestException(Response.Status.PRECONDITION_FAILED, | ||
"Redirected broker's service url is not configured."); | ||
} | ||
URL url = new URL(redirectUrl); | ||
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost()) | ||
.port(url.getPort()) | ||
.replaceQueryParam("authoritative", | ||
false).build(); | ||
|
||
// Redirect | ||
if (log.isDebugEnabled()) { | ||
log.debug("Redirecting the request call to leader - {}", redirect); | ||
} | ||
throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); | ||
} catch (MalformedURLException exception) { | ||
log.error("The leader broker url is malformed - {}", leaderBrokerUrl); | ||
throw new RestException(exception); | ||
} catch (ExecutionException | InterruptedException exception) { | ||
log.error("Leader broker not found - {}", leaderBrokerUrl); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ExecutionException may mean many things, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
throw new RestException(exception.getCause()); | ||
} catch (TimeoutException exception) { | ||
log.error("Leader broker not found within timeout - {}", leaderBrokerUrl); | ||
throw new RestException(exception); | ||
} | ||
} | ||
} | ||
|
||
public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) { | ||
if (StringUtils.isBlank(destinationBroker)) { | ||
return; | ||
} | ||
validateLeaderBroker(); | ||
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); | ||
} | ||
|
||
public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) { | ||
return validateSuperUserAccessAsync() | ||
.thenAccept(__ -> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.pulsar.broker.PulsarServerException; | ||
import org.apache.pulsar.broker.PulsarService; | ||
import org.apache.pulsar.broker.loadbalance.LoadManager; | ||
|
@@ -65,13 +66,13 @@ public LoadManagerReport generateLoadReport() { | |
|
||
@Override | ||
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) { | ||
String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); | ||
String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); | ||
if (!StringUtils.isBlank(affinityBroker)) { | ||
return Optional.of(buildBrokerResourceUnit(affinityBroker)); | ||
} | ||
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit); | ||
return leastLoadedBroker.map(s -> { | ||
String webServiceUrl = getBrokerWebServiceUrl(s); | ||
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl); | ||
return new SimpleResourceUnit(webServiceUrl, | ||
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); | ||
}); | ||
return leastLoadedBroker.map(this::buildBrokerResourceUnit); | ||
} | ||
|
||
private String getBrokerWebServiceUrl(String broker) { | ||
|
@@ -146,4 +147,16 @@ public Set<String> getAvailableBrokers() throws Exception { | |
public CompletableFuture<Set<String>> getAvailableBrokersAsync() { | ||
return loadManager.getAvailableBrokersAsync(); | ||
} | ||
|
||
private SimpleResourceUnit buildBrokerResourceUnit (String broker) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more blank here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
String webServiceUrl = getBrokerWebServiceUrl(broker); | ||
String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl); | ||
return new SimpleResourceUnit(webServiceUrl, | ||
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); | ||
} | ||
|
||
@Override | ||
public String setNamespaceBundleAffinity(String bundle, String broker) { | ||
return loadManager.setNamespaceBundleAffinity(bundle, broker); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to make this async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we make it async, the result could be indeterministic. If the bundle-broker mapping is not set by the time loadmanager loads the bundle, it will unload the bundle onto wrong broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it's dependent and we don't have to make it async here for this admin-api.