Skip to content

Commit

Permalink
fixed test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Mar 22, 2023
1 parent 7e157e8 commit cbac501
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 31 deletions.
4 changes: 2 additions & 2 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
# URL for Pulsar REST API (for admin operations)
# For TLS:
# webServiceUrl=https://localhost:8443/
webServiceUrl=http://localhost:8080/
webServiceUrl=http://localhost:53508/

# URL for Pulsar Binary Protocol (for produce and consume operations)
# For TLS:
# brokerServiceUrl=pulsar+ssl://localhost:6651/
brokerServiceUrl=pulsar://localhost:6650/
brokerServiceUrl=pulsar://localhost:53509/

# Authentication plugin to authenticate with servers
# e.g. for TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -61,7 +63,9 @@ public class BrokerLoadDataReporter implements LoadDataReporter<BrokerLoadData>,

private final BrokerLoadData lastData;

private volatile long lastTombstonedAt;
private final ScheduledExecutorService executor;

private long lastTombstonedAt;

private long tombstoneDelayInMillis;

Expand All @@ -71,6 +75,7 @@ public BrokerLoadDataReporter(PulsarService pulsar,
this.brokerLoadDataStore = brokerLoadDataStore;
this.lookupServiceAddress = lookupServiceAddress;
this.pulsar = pulsar;
this.executor = pulsar.getLoadManagerExecutor();
this.conf = this.pulsar.getConfiguration();
if (SystemUtils.IS_OS_LINUX) {
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
Expand Down Expand Up @@ -176,7 +181,8 @@ protected double percentChange(final double oldValue, final double newValue) {
return 100 * Math.abs((oldValue - newValue) / oldValue);
}

private void tombstone() {
@VisibleForTesting
protected void tombstone() {
var now = System.currentTimeMillis();
if (now - lastTombstonedAt < tombstoneDelayInMillis) {
return;
Expand All @@ -201,7 +207,7 @@ private void tombstone() {

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
this.pulsar.getLoadManagerExecutor().execute(() -> {
executor.execute(() -> {
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Releasing, Splitting -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.reporter;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -46,15 +48,18 @@ public class TopBundleLoadDataReporter implements LoadDataReporter<TopBundlesLoa

private final TopKBundles topKBundles;

private final ScheduledExecutorService executor;

private long lastBundleStatsUpdatedAt;

private long lastTombstonedAt;
private volatile long tombstoneDelayInMillis;
private long tombstoneDelayInMillis;

public TopBundleLoadDataReporter(PulsarService pulsar,
String lookupServiceAddress,
LoadDataStore<TopBundlesLoadData> bundleLoadDataStore) {
this.pulsar = pulsar;
this.executor = pulsar.getLoadManagerExecutor();
this.lookupServiceAddress = lookupServiceAddress;
this.bundleLoadDataStore = bundleLoadDataStore;
this.lastBundleStatsUpdatedAt = 0;
Expand Down Expand Up @@ -97,7 +102,8 @@ public CompletableFuture<Void> reportAsync(boolean force) {
}
}

private void tombstone() {
@VisibleForTesting
protected void tombstone() {
var now = System.currentTimeMillis();
if (now - lastTombstonedAt < tombstoneDelayInMillis) {
return;
Expand All @@ -110,7 +116,7 @@ private void tombstone() {
log.error("Failed to clean broker load data.");
lastTombstonedAt = lastSuccessfulTombstonedAt;
} else {
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log);
boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log);
if (debug) {
log.info("Cleaned broker load data.");
}
Expand All @@ -121,7 +127,7 @@ private void tombstone() {

@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
this.pulsar.getLoadManagerExecutor().execute(() -> {
executor.execute(() -> {
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Releasing, Splitting -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ public void testTopBundlesLoadData() {
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
stats1.msgRateIn = 100000;
bundleStats.put(bundle1, stats1);

NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
stats2.msgRateIn = 500;
bundleStats.put(bundle2, stats2);

NamespaceBundleStats stats3 = new NamespaceBundleStats();
stats3.msgRateIn = 100000;
stats3.msgRateIn = 10000;
bundleStats.put(bundle3, stats3);

NamespaceBundleStats stats4 = new NamespaceBundleStats();
Expand All @@ -107,8 +107,8 @@ public void testTopBundlesLoadData() {
var top1 = topKBundles.getLoadData().getTopBundlesLoadData().get(1);
var top2 = topKBundles.getLoadData().getTopBundlesLoadData().get(2);

assertEquals(top0.bundleName(), bundle3);
assertEquals(top1.bundleName(), bundle2);
assertEquals(top0.bundleName(), bundle2);
assertEquals(top1.bundleName(), bundle3);
assertEquals(top2.bundleName(), bundle1);
}

Expand Down Expand Up @@ -225,8 +225,8 @@ public void testLoadBalancerSheddingBundlesWithPoliciesEnabledConfig() throws Me
var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0);
var top1 = topKBundles.getLoadData().getTopBundlesLoadData().get(1);

assertEquals(top0.bundleName(), bundle2);
assertEquals(top1.bundleName(), bundle1);
assertEquals(top0.bundleName(), bundle1);
assertEquals(top1.bundleName(), bundle2);

configuration.setLoadBalancerSheddingBundlesWithPoliciesEnabled(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -40,11 +42,13 @@
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.stats.BrokerStats;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -59,17 +63,22 @@ public class BrokerLoadDataReporterTest {
SystemResourceUsage usage;
String broker = "broker1";
String bundle = "bundle1";
ScheduledExecutorService executor;

@BeforeMethod
void setup() {
config = new ServiceConfiguration();
config.setLoadBalancerDebugModeEnabled(true);
pulsar = mock(PulsarService.class);
store = mock(LoadDataStore.class);
brokerService = mock(BrokerService.class);
pulsarStats = mock(PulsarStats.class);
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(config).when(pulsar).getConfiguration();
doReturn(Executors.newSingleThreadScheduledExecutor()).when(pulsar).getLoadManagerExecutor();
executor = Executors
.newSingleThreadScheduledExecutor(new
ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
doReturn(executor).when(pulsar).getLoadManagerExecutor();
doReturn(pulsarStats).when(brokerService).getPulsarStats();
brokerStats = new BrokerStats(0);
brokerStats.topics = 6;
Expand All @@ -81,6 +90,7 @@ void setup() {
doReturn(pulsarStats).when(brokerService).getPulsarStats();
doReturn(brokerStats).when(pulsarStats).getBrokerStats();
doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(store).removeAsync(any());

usage = new SystemResourceUsage();
usage.setCpu(new ResourceUsage(1.0, 100.0));
Expand All @@ -90,6 +100,11 @@ void setup() {
usage.setBandwidthOut(new ResourceUsage(4.0, 100.0));
}

@AfterMethod
void shutdown(){
executor.shutdown();
}

public void testGenerate() throws IllegalAccessException {
try (MockedStatic<LoadManagerShared> mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) {
mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage);
Expand Down Expand Up @@ -132,47 +147,54 @@ public void testReport() throws IllegalAccessException {
}

@Test
public void testTombstone() throws IllegalAccessException {
public void testTombstone() throws IllegalAccessException, InterruptedException {

var target = new BrokerLoadDataReporter(pulsar, broker, store);
var target = spy(new BrokerLoadDataReporter(pulsar, broker, store));

target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Assigning, broker, VERSION_ID_INIT), null);
verify(store, times(0)).removeAsync(eq(broker));
verify(target, times(0)).tombstone();

target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, broker, VERSION_ID_INIT), null);
verify(store, times(0)).removeAsync(eq(broker));
verify(target, times(0)).tombstone();


target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, broker, VERSION_ID_INIT), null);
verify(store, times(0)).removeAsync(eq(broker));
verify(target, times(0)).tombstone();

target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, broker, VERSION_ID_INIT), null);
verify(store, times(0)).removeAsync(eq(broker));
verify(target, times(0)).tombstone();

target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
verify(target, times(1)).tombstone();
verify(store, times(1)).removeAsync(eq(broker));
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
assertEquals(localData, new BrokerLoadData());
});

{
target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null);
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
verify(target, times(2)).tombstone();
verify(store, times(1)).removeAsync(eq(broker));
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
assertEquals(localData, new BrokerLoadData());
}
});

FieldUtils.writeDeclaredField(target, "tombstoneDelayInMillis", 0, true);
target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", broker, VERSION_ID_INIT), null);
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
verify(target, times(3)).tombstone();
verify(store, times(2)).removeAsync(eq(broker));
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
assertEquals(localData, new BrokerLoadData());
Expand All @@ -181,10 +203,10 @@ public void testTombstone() throws IllegalAccessException {
target.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, broker, VERSION_ID_INIT), null);
Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
verify(target, times(4)).tombstone();
verify(store, times(3)).removeAsync(eq(broker));
var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true);
assertEquals(localData, new BrokerLoadData());
});

}
}
Loading

0 comments on commit cbac501

Please sign in to comment.