From cbac501e1e0f653b57e0121bffb7797641cb43f7 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 22 Mar 2023 09:22:13 -0700 Subject: [PATCH] fixed test failure --- conf/client.conf | 4 +- .../reporter/BrokerLoadDataReporter.java | 12 ++++-- .../reporter/TopBundleLoadDataReporter.java | 14 +++++-- .../extensions/models/TopKBundlesTest.java | 14 +++---- .../reporter/BrokerLoadDataReporterTest.java | 38 +++++++++++++++---- .../TopBundleLoadDataReporterTest.java | 33 ++++++++++++---- 6 files changed, 84 insertions(+), 31 deletions(-) diff --git a/conf/client.conf b/conf/client.conf index 8a485e5676c7bf..5c6a84b4b4d798 100644 --- a/conf/client.conf +++ b/conf/client.conf @@ -22,12 +22,12 @@ # URL for Pulsar REST API (for admin operations) # For TLS: # webServiceUrl=https://localhost:8443/ -webServiceUrl=http://localhost:8080/ +webServiceUrl=http://localhost:53508/ # URL for Pulsar Binary Protocol (for produce and consume operations) # For TLS: # brokerServiceUrl=pulsar+ssl://localhost:6651/ -brokerServiceUrl=pulsar://localhost:6650/ +brokerServiceUrl=pulsar://localhost:53509/ # Authentication plugin to authenticate with servers # e.g. for TLS diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java index 43c9fc7cb4aede..5b27c9ac2f26d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporter.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.reporter; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -61,7 +63,9 @@ public class BrokerLoadDataReporter implements LoadDataReporter, private final BrokerLoadData lastData; - private volatile long lastTombstonedAt; + private final ScheduledExecutorService executor; + + private long lastTombstonedAt; private long tombstoneDelayInMillis; @@ -71,6 +75,7 @@ public BrokerLoadDataReporter(PulsarService pulsar, this.brokerLoadDataStore = brokerLoadDataStore; this.lookupServiceAddress = lookupServiceAddress; this.pulsar = pulsar; + this.executor = pulsar.getLoadManagerExecutor(); this.conf = this.pulsar.getConfiguration(); if (SystemUtils.IS_OS_LINUX) { brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar); @@ -176,7 +181,8 @@ protected double percentChange(final double oldValue, final double newValue) { return 100 * Math.abs((oldValue - newValue) / oldValue); } - private void tombstone() { + @VisibleForTesting + protected void tombstone() { var now = System.currentTimeMillis(); if (now - lastTombstonedAt < tombstoneDelayInMillis) { return; @@ -201,7 +207,7 @@ private void tombstone() { @Override public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { - this.pulsar.getLoadManagerExecutor().execute(() -> { + executor.execute(() -> { ServiceUnitState state = ServiceUnitStateData.state(data); switch (state) { case Releasing, Splitting -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java index 9ec02eb6854467..ed410b919b9d94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporter.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.reporter; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; @@ -46,15 +48,18 @@ public class TopBundleLoadDataReporter implements LoadDataReporter bundleLoadDataStore) { this.pulsar = pulsar; + this.executor = pulsar.getLoadManagerExecutor(); this.lookupServiceAddress = lookupServiceAddress; this.bundleLoadDataStore = bundleLoadDataStore; this.lastBundleStatsUpdatedAt = 0; @@ -97,7 +102,8 @@ public CompletableFuture reportAsync(boolean force) { } } - private void tombstone() { + @VisibleForTesting + protected void tombstone() { var now = System.currentTimeMillis(); if (now - lastTombstonedAt < tombstoneDelayInMillis) { return; @@ -110,7 +116,7 @@ private void tombstone() { log.error("Failed to clean broker load data."); lastTombstonedAt = lastSuccessfulTombstonedAt; } else { - boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfig(), log); + boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log); if (debug) { log.info("Cleaned broker load data."); } @@ -121,7 +127,7 @@ private void tombstone() { @Override public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { - this.pulsar.getLoadManagerExecutor().execute(() -> { + executor.execute(() -> { ServiceUnitState state = ServiceUnitStateData.state(data); switch (state) { case Releasing, Splitting -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 9b42163bd664b2..472d44df8906d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -87,15 +87,15 @@ public void testTopBundlesLoadData() { Map bundleStats = new HashMap<>(); var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); - stats1.msgRateIn = 500; + stats1.msgRateIn = 100000; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); - stats2.msgRateIn = 10000; + stats2.msgRateIn = 500; bundleStats.put(bundle2, stats2); NamespaceBundleStats stats3 = new NamespaceBundleStats(); - stats3.msgRateIn = 100000; + stats3.msgRateIn = 10000; bundleStats.put(bundle3, stats3); NamespaceBundleStats stats4 = new NamespaceBundleStats(); @@ -107,8 +107,8 @@ public void testTopBundlesLoadData() { var top1 = topKBundles.getLoadData().getTopBundlesLoadData().get(1); var top2 = topKBundles.getLoadData().getTopBundlesLoadData().get(2); - assertEquals(top0.bundleName(), bundle3); - assertEquals(top1.bundleName(), bundle2); + assertEquals(top0.bundleName(), bundle2); + assertEquals(top1.bundleName(), bundle3); assertEquals(top2.bundleName(), bundle1); } @@ -225,8 +225,8 @@ public void testLoadBalancerSheddingBundlesWithPoliciesEnabledConfig() throws Me var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0); var top1 = topKBundles.getLoadData().getTopBundlesLoadData().get(1); - assertEquals(top0.bundleName(), bundle2); - assertEquals(top1.bundleName(), bundle1); + assertEquals(top0.bundleName(), bundle1); + assertEquals(top1.bundleName(), bundle2); configuration.setLoadBalancerSheddingBundlesWithPoliciesEnabled(false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java index 86e7e895d96eda..5fefd729547de0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/BrokerLoadDataReporterTest.java @@ -23,11 +23,13 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; @@ -40,11 +42,13 @@ import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.PulsarStats; import org.apache.pulsar.broker.stats.BrokerStats; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -59,17 +63,22 @@ public class BrokerLoadDataReporterTest { SystemResourceUsage usage; String broker = "broker1"; String bundle = "bundle1"; + ScheduledExecutorService executor; @BeforeMethod void setup() { config = new ServiceConfiguration(); + config.setLoadBalancerDebugModeEnabled(true); pulsar = mock(PulsarService.class); store = mock(LoadDataStore.class); brokerService = mock(BrokerService.class); pulsarStats = mock(PulsarStats.class); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(config).when(pulsar).getConfiguration(); - doReturn(Executors.newSingleThreadScheduledExecutor()).when(pulsar).getLoadManagerExecutor(); + executor = Executors + .newSingleThreadScheduledExecutor(new + ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager")); + doReturn(executor).when(pulsar).getLoadManagerExecutor(); doReturn(pulsarStats).when(brokerService).getPulsarStats(); brokerStats = new BrokerStats(0); brokerStats.topics = 6; @@ -81,6 +90,7 @@ void setup() { doReturn(pulsarStats).when(brokerService).getPulsarStats(); doReturn(brokerStats).when(pulsarStats).getBrokerStats(); doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any()); + doReturn(CompletableFuture.completedFuture(null)).when(store).removeAsync(any()); usage = new SystemResourceUsage(); usage.setCpu(new ResourceUsage(1.0, 100.0)); @@ -90,6 +100,11 @@ void setup() { usage.setBandwidthOut(new ResourceUsage(4.0, 100.0)); } + @AfterMethod + void shutdown(){ + executor.shutdown(); + } + public void testGenerate() throws IllegalAccessException { try (MockedStatic mockLoadManagerShared = Mockito.mockStatic(LoadManagerShared.class)) { mockLoadManagerShared.when(() -> LoadManagerShared.getSystemResourceUsage(any())).thenReturn(usage); @@ -132,47 +147,54 @@ public void testReport() throws IllegalAccessException { } @Test - public void testTombstone() throws IllegalAccessException { + public void testTombstone() throws IllegalAccessException, InterruptedException { - var target = new BrokerLoadDataReporter(pulsar, broker, store); + var target = spy(new BrokerLoadDataReporter(pulsar, broker, store)); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(1)).tombstone(); verify(store, times(1)).removeAsync(eq(broker)); var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); assertEquals(localData, new BrokerLoadData()); }); - { - target.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null); + target.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null); + Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(2)).tombstone(); verify(store, times(1)).removeAsync(eq(broker)); var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); assertEquals(localData, new BrokerLoadData()); - } + }); FieldUtils.writeDeclaredField(target, "tombstoneDelayInMillis", 0, true); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(3)).tombstone(); verify(store, times(2)).removeAsync(eq(broker)); var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); assertEquals(localData, new BrokerLoadData()); @@ -181,10 +203,10 @@ public void testTombstone() throws IllegalAccessException { target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(4)).tombstone(); verify(store, times(3)).removeAsync(eq(broker)); var localData = (BrokerLoadData) FieldUtils.readDeclaredField(target, "localData", true); assertEquals(localData, new BrokerLoadData()); }); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java index 79bce42fad16e7..82e70582e07f5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/reporter/TopBundleLoadDataReporterTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; @@ -32,6 +33,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; @@ -50,6 +52,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -68,6 +71,7 @@ public class TopBundleLoadDataReporterTest { String bundle2 = "my-tenant/my-namespace2/0x00000000_0x0FFFFFFF"; String bundle = bundle1; String broker = "broker-1"; + ScheduledExecutorService executor; @BeforeMethod void setup() throws MetadataStoreException { @@ -81,15 +85,16 @@ void setup() throws MetadataStoreException { isolationPolicyResources = mock(NamespaceResources.IsolationPolicyResources.class); var namespaceResources = mock(NamespaceResources.class); localPoliciesResources = mock(LocalPoliciesResources.class); + executor = Executors + .newSingleThreadScheduledExecutor(new + ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager")); doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(config).when(pulsar).getConfiguration(); doReturn(pulsarStats).when(brokerService).getPulsarStats(); doReturn(CompletableFuture.completedFuture(null)).when(store).pushAsync(any(), any()); - doReturn(Executors - .newSingleThreadScheduledExecutor(new - ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"))) - .when(pulsar).getLoadManagerExecutor(); + doReturn(CompletableFuture.completedFuture(null)).when(store).removeAsync(any()); + doReturn(executor).when(pulsar).getLoadManagerExecutor(); doReturn(pulsarResources).when(pulsar).getPulsarResources(); doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); @@ -109,6 +114,11 @@ void setup() throws MetadataStoreException { doReturn(bundleStats).when(brokerService).getBundleStats(); } + @AfterMethod + void shutdown(){ + executor.shutdown(); + } + public void testZeroUpdatedAt() { doReturn(0l).when(pulsarStats).getUpdatedAt(); var target = new TopBundleLoadDataReporter(pulsar, "", store); @@ -165,47 +175,56 @@ public void testReport(){ @Test public void testTombstone() throws IllegalAccessException { - var target = new TopBundleLoadDataReporter(pulsar, broker, store); + var target = spy(new TopBundleLoadDataReporter(pulsar, broker, store)); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Assigning, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, broker, VERSION_ID_INIT), null); verify(store, times(0)).removeAsync(eq(broker)); + verify(target, times(0)).tombstone(); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(1)).tombstone(); verify(store, times(1)).removeAsync(eq(broker)); }); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-2", broker, VERSION_ID_INIT), null); - verify(store, times( 1)).removeAsync(eq(broker)); + Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(2)).tombstone(); + verify(store, times(1)).removeAsync(eq(broker)); + }); FieldUtils.writeDeclaredField(target, "tombstoneDelayInMillis", 0, true); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-2", broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(3)).tombstone(); verify(store, times(2)).removeAsync(eq(broker)); }); target.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, broker, VERSION_ID_INIT), null); Awaitility.waitAtMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + verify(target, times(4)).tombstone(); verify(store, times(3)).removeAsync(eq(broker)); }); - } }