-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 Improved Auto Unload Logic #19813
[improve][broker] PIP-192 Improved Auto Unload Logic #19813
Conversation
Please pause the review. There will be more updates. |
1af3e15
to
31a3c3c
Compare
Please continue the review. |
Please resolve the conflicts and rebase it into the master branch. |
Yes, I will rebase after the pr for excluding bundles with policies. |
Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare( | ||
a.getValue().getWeightedMaxEMA(), | ||
b.getValue().getWeightedMaxEMA())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Collections.sort(brokersSortedByLoad, (a, b) -> Double.compare( | |
a.getValue().getWeightedMaxEMA(), | |
b.getValue().getWeightedMaxEMA())); | |
brokersSortedByLoad.sort(Comparator.comparingDouble(a -> a.getValue().getWeightedMaxEMA())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -134,25 +143,29 @@ public void init() throws MetadataStoreException { | |||
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); | |||
return new NamespaceBundle(NamespaceName.get(namespace), hashRange, factory); | |||
}).when(factory).getBundle(anyString(), anyString()); | |||
doReturn(true).when(antiAffinityGroupPolicyHelper).canUnload(any(), any(), any(), any()); | |||
doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doReturn(new AtomicReference(loadManagerWrapper)).when(pulsar).getLoadManager(); | |
doReturn(new AtomicReference<>(loadManagerWrapper)).when(pulsar).getLoadManager(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -567,8 +616,7 @@ private void setIsolationPolicies(SimpleResourceAllocationPolicies policies, | |||
}).when(policies).shouldFailoverToSecondaries(eq(namespaceName), anyInt()); | |||
} | |||
|
|||
private PulsarService getMockPulsar() { | |||
var pulsar = mock(PulsarService.class); | |||
private PulsarService getMockPulsar(PulsarService pulsar) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is never used right now. Please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -589,8 +637,9 @@ private PulsarService getMockPulsar() { | |||
|
|||
@Test | |||
public void testBundlesWithAntiAffinityGroup() throws IllegalAccessException, MetadataStoreException { | |||
var pulsar = getMockPulsar(); | |||
TransferShedder transferShedder = new TransferShedder(pulsar, antiAffinityGroupPolicyHelper); | |||
//var pulsar = getMockPulsar(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -66,7 +66,7 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) { | |||
topk = Math.min(topk, arr.size()); | |||
partitionSort(arr, topk); | |||
|
|||
for (int i = 0; i < topk; i++) { | |||
for (int i = topk - 1; i >= 0; i--) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reverse the topK bundles order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to transfer lower-loaded ones first to meet the target threshold. If the highest is too much traffic, we can't unload at all.
31a3c3c
to
0ca5ae3
Compare
0ca5ae3
to
f310c4f
Compare
Rebase is done. Please continue the review. |
d619eb9
to
cbac501
Compare
cbac501
to
1cc8aab
Compare
|
||
@Override | ||
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { | ||
executor.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to use executor here? I did not see any block operating here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention was to control this handleEvents
in a single-threaded manner. I agree that we don't have to use executor here. Removing it.
@@ -156,6 +159,13 @@ public void start() { | |||
task = loadManagerExecutor.scheduleAtFixedRate(() -> { | |||
try { | |||
execute(); | |||
if (conf.isLoadBalancerDebugModeEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use debugMode
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
private LoadDataStore<BrokerLoadData> loadDataStore; | ||
|
||
private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; | |
private List<Map.Entry<String, BrokerLoadData>> brokersSortedByLoad; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
...in/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
Show resolved
Hide resolved
a9b4171
to
459631b
Compare
@Demogorgon314 can we merge this pr? |
Master Issue: Master Issue: #16691, #18099
Motivation
Raising a PR to implement Master Issue: #16691, #18099
We want to reduce unload frequencies from flaky traffic.
Modifications
This PR
loadBalancerSheddingConditionHitCountThreshold
to further restrict shedding conditions based on the hit count.loadBalanceSheddingDelayInSeconds
value from 600 to 180, as 10 mins are too long. 3 mins can be long enough to catch the new load after unloads.loadBalancerBundleLoadReportPercentage
toloadBalancerMaxNumberOfBundlesInBundleLoadReport
to make the topk bundle count absolute instead of relative.loadBalancerNamespaceBundleSplitConditionThreshold
toloadBalancerNamespaceBundleSplitConditionHitCountThreshold
to be consistent with*ConditionHitCountThreshold
.loadBalancerMaxNumberOfBrokerTransfersPerCycle
toloadBalancerMaxNumberOfBrokerSheddingPerCycle
.msgThroughputEMA
in BrokerLoadData to smooth the broker throughput info.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
We will have separate PRs to update the Doc later.
Matching PR in forked repository
PR in forked repository: heesung-sn#39