From e2290b3083d1e822230e5b01fc44a429c08955e0 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 21 Feb 2023 18:53:02 -0800 Subject: [PATCH] updated monitor logic --- .../extensions/channel/ServiceUnitState.java | 12 +- .../channel/ServiceUnitStateChannelImpl.java | 290 ++++++++----- .../ServiceUnitStateCompactionStrategy.java | 14 +- .../channel/ServiceUnitStateData.java | 15 +- .../ExtensibleLoadManagerImplTest.java | 31 +- .../channel/ServiceUnitStateChannelTest.java | 380 ++++++++++++++---- ...erviceUnitStateCompactionStrategyTest.java | 18 +- .../channel/ServiceUnitStateTest.java | 10 +- .../ServiceUnitStateCompactionTest.java | 24 +- 9 files changed, 574 insertions(+), 220 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java index 40dd87c85d711..43a3b0a3e325b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java @@ -42,15 +42,15 @@ public enum ServiceUnitState { Deleted; // deleted in the system (semi-terminal state) - private static Map> validTransitions = Map.of( + private static final Map> validTransitions = Map.of( // (Init -> all states) transitions are required // when the topic is compacted in the middle of assign, transfer or split. - Init, Set.of(Free, Owned, Assigned, Released, Splitting, Deleted, Init), + Init, Set.of(Free, Owned, Assigned, Released, Splitting, Deleted), Free, Set.of(Assigned, Init), - Owned, Set.of(Assigned, Splitting, Released, Init), - Assigned, Set.of(Owned, Released, Init), - Released, Set.of(Owned, Free, Init), - Splitting, Set.of(Deleted, Init), + Owned, Set.of(Assigned, Splitting, Released), + Assigned, Set.of(Owned, Released), + Released, Set.of(Owned, Free), + Splitting, Set.of(Deleted), Deleted, Set.of(Init) ); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 9ff272c1e1f76..f1656bbadef5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -38,6 +38,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Jittery; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; import com.google.common.annotations.VisibleForTesting; @@ -68,8 +69,11 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -103,7 +107,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60; public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately - private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private final PulsarService pulsar; @@ -113,6 +116,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final String lookupServiceAddress; private final ConcurrentOpenHashMap> cleanupJobs; private final LeaderElectionService leaderElectionService; + private BrokerSelectionStrategy brokerSelector; private BrokerRegistry brokerRegistry; private TableView tableview; private Producer producer; @@ -126,17 +130,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private long maxCleanupDelayTimeInSecs; private long minCleanupDelayTimeInSecs; // cleanup metrics - private long totalCleanupCnt = 0; - private long totalBrokerCleanupTombstoneCnt = 0; - private long totalServiceUnitCleanupTombstoneCnt = 0; + private long totalInactiveBrokerCleanupCnt = 0; + private long totalServiceUnitTombstoneCleanupCnt = 0; + + private long totalOrphanServiceUnitCleanupCnt = 0; private AtomicLong totalCleanupErrorCnt = new AtomicLong(); - private long totalCleanupScheduledCnt = 0; - private long totalCleanupIgnoredCnt = 0; - private long totalCleanupCancelledCnt = 0; + private long totalInactiveBrokerCleanupScheduledCnt = 0; + private long totalInactiveBrokerCleanupIgnoredCnt = 0; + private long totalInactiveBrokerCleanupCancelledCnt = 0; private volatile ChannelState channelState; - private AtomicLong totalBundleSplitRetryCount = new AtomicLong(); - public enum EventType { Assign, Split, @@ -246,14 +249,13 @@ public synchronized void start() throws PulsarServerException { boolean debug = debug(); try { this.brokerRegistry = getBrokerRegistry(); - this.brokerRegistry.addListener((broker, type) -> { - handleBrokerRegistrationEvent(broker, type); - }); + this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent); leaderElectionService.start(); this.channelState = LeaderElectionServiceStarted; if (debug) { log.info("Successfully started the channel leader election service."); } + brokerSelector = getBrokerSelector(); if (producer != null) { producer.close(); @@ -308,6 +310,18 @@ protected BrokerRegistry getBrokerRegistry() { .get().getBrokerRegistry(); } + @VisibleForTesting + protected LoadManagerContext getContext() { + return ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()) + .get().getContext(); + } + + @VisibleForTesting + protected BrokerSelectionStrategy getBrokerSelector() { + // TODO: make this selector configurable. + return new LeastResourceUsageWithWeight(); + } + public synchronized void close() throws PulsarServerException { channelState = Closed; boolean debug = debug(); @@ -413,14 +427,15 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { } ServiceUnitStateData data = tableview.get(serviceUnit); - ServiceUnitState state = data == null ? Init : data.state(); + ServiceUnitState state = state(data); ownerLookUpCounters.get(state).incrementAndGet(); switch (state) { case Owned, Splitting -> { return CompletableFuture.completedFuture(Optional.of(data.broker())); } case Assigned, Released -> { - return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of); + return deferGetOwnerRequest(serviceUnit).thenApply( + broker -> broker == null ? Optional.empty() : Optional.of(broker)); } case Init, Free -> { return CompletableFuture.completedFuture(Optional.empty()); @@ -492,14 +507,16 @@ private void handle(String serviceUnit, ServiceUnitStateData data) { lookupServiceAddress, serviceUnit, data, totalHandledRequests); } - ServiceUnitState state = data == null ? Init : data.state(); + ServiceUnitState state = state(data); try { switch (state) { case Owned -> handleOwnEvent(serviceUnit, data); case Assigned -> handleAssignEvent(serviceUnit, data); case Released -> handleReleaseEvent(serviceUnit, data); case Splitting -> handleSplitEvent(serviceUnit, data); - case Deleted, Free, Init -> handleInitEvent(serviceUnit); + case Deleted -> handleDeleteEvent(serviceUnit, data); + case Free -> handleFreeEvent(serviceUnit, data); + case Init -> handleInitEvent(serviceUnit); default -> throw new IllegalStateException("Failed to handle channel data:" + data); } } catch (Throwable e){ @@ -534,7 +551,7 @@ private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) { } private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) { - var state = data == null ? Init : data.state(); + var state = state(data); var counter = total ? handlerCounters.get(state).getTotal() : handlerCounters.get(state).getFailure(); if (counter == null) { @@ -580,7 +597,6 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { } private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { - deferGetOwnerRequest(serviceUnit); if (isTargetBroker(data.broker())) { ServiceUnitStateData next = new ServiceUnitStateData( isTransferCommand(data) ? Released : Owned, data.broker(), data.sourceBroker()); @@ -590,7 +606,6 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { } private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { - if (isTransferCommand(data)) { if (isTargetBroker(data.sourceBroker())) { ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker()); @@ -616,24 +631,32 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { } } - private void handleDisableEvent(String serviceUnit, ServiceUnitStateData data) { + private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { + var getOwnerRequest = getOwnerRequests.remove(serviceUnit); + if (getOwnerRequest != null) { + getOwnerRequest.complete(null); + } if (isTargetBroker(data.broker())) { - ServiceUnitStateData next = new ServiceUnitStateData(Free, data.broker()); - pubAsync(serviceUnit, next) - .whenComplete((__, e) -> log(e, serviceUnit, data, next)); + log(null, serviceUnit, data, null); + } + } + + private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { + var getOwnerRequest = getOwnerRequests.remove(serviceUnit); + if (getOwnerRequest != null) { + getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); + } + if (isTargetBroker(data.broker())) { + log(null, serviceUnit, data, null); } } private void handleInitEvent(String serviceUnit) { - closeServiceUnit(serviceUnit) - .thenAccept(__ -> { - var request = getOwnerRequests.remove(serviceUnit); - if (request != null) { - request.completeExceptionally(new IllegalStateException("The ownership has been unloaded. " - + "No owner is found for serviceUnit: " + serviceUnit)); - } - }) - .whenComplete((__, e) -> log(e, serviceUnit, null, null)); + var getOwnerRequest = getOwnerRequests.remove(serviceUnit); + if (getOwnerRequest != null) { + getOwnerRequest.complete(null); + } + log(null, serviceUnit, null, null); } private CompletableFuture pubAsync(String serviceUnit, ServiceUnitStateData data) { @@ -744,10 +767,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, CompletableFuture completionFuture) { CompletableFuture> updateFuture = new CompletableFuture<>(); - if (counter.get() > 0) { - totalBundleSplitRetryCount.incrementAndGet(); - } - pulsar.getNamespaceService().getSplitBoundary(bundle, null).thenAccept(splitBundlesPair -> { // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. if (splitBundlesPair == null) { @@ -863,7 +882,7 @@ private void handleBrokerCreationEvent(String broker) { CompletableFuture future = cleanupJobs.remove(broker); if (future != null) { future.cancel(false); - totalCleanupCancelledCnt++; + totalInactiveBrokerCleanupCancelledCnt++; log.info("Successfully cancelled the ownership cleanup for broker:{}." + " Active cleanup job count:{}", broker, cleanupJobs.size()); @@ -886,7 +905,7 @@ private void handleBrokerDeletionEvent(String broker) { case Stable -> scheduleCleanup(broker, minCleanupDelayTimeInSecs); case Jittery -> scheduleCleanup(broker, maxCleanupDelayTimeInSecs); case Unstable -> { - totalCleanupIgnoredCnt++; + totalInactiveBrokerCleanupIgnoredCnt++; log.error("MetadataState state is unstable. " + "Ignoring the ownership cleanup request for the reported broker :{}", broker); } @@ -897,7 +916,7 @@ private void scheduleCleanup(String broker, long delayInSecs) { cleanupJobs.computeIfAbsent(broker, k -> { Executor delayed = CompletableFuture .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); - totalCleanupScheduledCnt++; + totalInactiveBrokerCleanupScheduledCnt++; return CompletableFuture .runAsync(() -> { try { @@ -915,27 +934,40 @@ private void scheduleCleanup(String broker, long delayInSecs) { broker, delayInSecs, cleanupJobs.size()); } + private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, Set availableBrokers) { + + Optional selectedBroker = brokerSelector.select(availableBrokers, null, getContext()); + if (selectedBroker.isPresent()) { + var override = new ServiceUnitStateData(Owned, selectedBroker.get(), true); + log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", + serviceUnit, orphanData, override); + pubAsync(serviceUnit, override).whenComplete((__, e) -> { + if (e != null) { + log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}", + serviceUnit, orphanData, override, e); + } + }); + } else { + log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker.", + serviceUnit, orphanData); + } + } + - private void doCleanup(String broker) { + private void doCleanup(String broker) throws ExecutionException, InterruptedException, TimeoutException { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); - int serviceUnitTombstoneCnt = 0; + int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); - for (Map.Entry etr : tableview.entrySet()) { - ServiceUnitStateData stateData = etr.getValue(); - String serviceUnit = etr.getKey(); + var availableBrokers = new HashSet(brokerRegistry.getAvailableBrokersAsync() + .get(inFlightStateWaitingTimeInMillis, MILLISECONDS)); + for (var etr : tableview.entrySet()) { + var stateData = etr.getValue(); + var serviceUnit = etr.getKey(); if (StringUtils.equals(broker, stateData.broker()) || StringUtils.equals(broker, stateData.sourceBroker())) { - log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); - tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e != null) { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " - + "cleanupErrorCnt:{}.", - serviceUnit, stateData, - totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); - } - }); - serviceUnitTombstoneCnt++; + overrideOwnership(serviceUnit, stateData, availableBrokers); + orphanServiceUnitCleanupCnt++; } } @@ -945,26 +977,49 @@ private void doCleanup(String broker) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt > 0) { - this.totalCleanupCnt++; - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; - this.totalBrokerCleanupTombstoneCnt++; + if (orphanServiceUnitCleanupCnt > 0) { + this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt; + this.totalInactiveBrokerCleanupCnt++; } double cleanupTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); // TODO: clean load data stores log.info("Completed a cleanup for the inactive broker:{} in {} ms. " - + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " + + "Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, " + "approximate cleanupErrorCnt:{}, metrics:{} ", broker, cleanupTime, - serviceUnitTombstoneCnt, + orphanServiceUnitCleanupCnt, totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); cleanupJobs.remove(broker); } + private Optional getOverrideStateData(String serviceUnit, ServiceUnitStateData orphanData, + Set availableBrokers, + LoadManagerContext context) { + if (isTransferCommand(orphanData)) { + // rollback to the src + return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true)); + } else if (orphanData.state() == Assigned) { // assign + // roll-forward to another broker + Optional selectedBroker = brokerSelector.select(availableBrokers, null, context); + if (selectedBroker.isEmpty()) { + return Optional.empty(); + } + return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true)); + } else if (orphanData.state() == Splitting || orphanData.state() == Released) { + // rollback to the target broker for split and unload + return Optional.of(new ServiceUnitStateData(Owned, orphanData.broker(), true)); + } else { + var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s", + serviceUnit, orphanData); + log.error(msg); + throw new IllegalStateException(msg); + } + } + @VisibleForTesting protected void monitorOwnerships(List brokers) { if (!isChannelOwner()) { @@ -980,7 +1035,9 @@ protected void monitorOwnerships(List brokers) { long startTime = System.nanoTime(); Set inactiveBrokers = new HashSet<>(); Set activeBrokers = new HashSet<>(brokers); - int serviceUnitTombstoneCnt = 0; + Map orphanServiceUnits = new HashMap<>(); + int serviceUnitTombstoneCleanupCnt = 0; + int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); long now = System.currentTimeMillis(); for (Map.Entry etr : tableview.entrySet()) { @@ -992,59 +1049,82 @@ protected void monitorOwnerships(List brokers) { inactiveBrokers.add(stateData.broker()); } else if (state != Owned && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) { - boolean tombstone = false; if (state == Deleted || state == Free) { if (now - stateData.timestamp() > semiTerminalStateWaitingTimeInMillis) { - log.info("Found semi-terminal states to clean" + log.info("Found semi-terminal states to tombstone" + " serviceUnit:{}, stateData:{}", serviceUnit, stateData); - tombstone = true; + tombstoneAsync(serviceUnit).whenComplete((__, e) -> { + if (e != null) { + log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + + "cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); + } + }); + serviceUnitTombstoneCleanupCnt++; } } else { - log.warn("Found long-running orphan(in-flight) serviceUnit:{}, stateData:{}", - serviceUnit, stateData); - tombstone = true; + log.warn("Found orphan serviceUnit:{}, stateData:{}", serviceUnit, stateData); + orphanServiceUnits.put(serviceUnit, stateData); } + } + } - - if (tombstone) { - tombstoneAsync(serviceUnit).whenComplete((__, e) -> { + // Skip cleaning orphan bundles if inactiveBrokers exist. This is a bigger problem. + if (!inactiveBrokers.isEmpty()) { + for (String inactiveBroker : inactiveBrokers) { + handleBrokerDeletionEvent(inactiveBroker); + } + } else if (!orphanServiceUnits.isEmpty()) { + var context = getContext(); + for (var etr : orphanServiceUnits.entrySet()) { + var orphanServiceUnit = etr.getKey(); + var orphanData = etr.getValue(); + var overrideData = getOverrideStateData( + orphanServiceUnit, orphanData, activeBrokers, context); + if (overrideData.isPresent()) { + pubAsync(orphanServiceUnit, overrideData.get()).whenComplete((__, e) -> { if (e != null) { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + log.error("Failed cleaning the ownership orphanServiceUnit:{}, orphanData:{}, " + "cleanupErrorCnt:{}.", - serviceUnit, stateData, + orphanServiceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e); } }); - serviceUnitTombstoneCnt++; + orphanServiceUnitCleanupCnt++; + } else { + log.warn("Failed get the overrideStateData from orphanServiceUnit:{}, orphanData:{}. will retry..", + orphanServiceUnit, orphanData); } } } - for (String inactiveBroker : inactiveBrokers) { - handleBrokerDeletionEvent(inactiveBroker); - } - try { producer.flush(); } catch (PulsarClientException e) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt > 0) { - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; + if (serviceUnitTombstoneCleanupCnt > 0) { + this.totalServiceUnitTombstoneCleanupCnt += serviceUnitTombstoneCleanupCnt; + } + + if (orphanServiceUnitCleanupCnt > 0) { + this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt; } double monitorTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); log.info("Completed the ownership monitor run in {} ms. " - + "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. " - + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "approximate cleanupErrorCnt:{}, metrics:{} ", + + "Scheduled cleanups for inactive brokers:{}. inactiveBrokerCount:{}. " + + "Published cleanups for orphan service units, orphanServiceUnitCleanupCnt:{}. " + + "Tombstoned semi-terminal state service units, serviceUnitTombstoneCleanupCnt:{}. " + + "Approximate cleanupErrorCnt:{}, metrics:{}. ", monitorTime, - inactiveBrokers, - inactiveBrokers.size(), - serviceUnitTombstoneCnt, + inactiveBrokers, inactiveBrokers.size(), + orphanServiceUnitCleanupCnt, + serviceUnitTombstoneCleanupCnt, totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); @@ -1052,17 +1132,19 @@ protected void monitorOwnerships(List brokers) { private String printCleanupMetrics() { return String.format( - "{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, " - + "totalServiceUnitCleanupTombstoneCnt:%d, totalCleanupErrorCnt:%d, " - + "totalCleanupScheduledCnt%d, totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, " + "{totalInactiveBrokerCleanupCnt:%d, " + + "totalServiceUnitTombstoneCleanupCnt:%d, totalOrphanServiceUnitCleanupCnt:%d, " + + "totalCleanupErrorCnt:%d, " + + "totalInactiveBrokerCleanupScheduledCnt%d, totalInactiveBrokerCleanupIgnoredCnt:%d, " + + "totalInactiveBrokerCleanupCancelledCnt:%d, " + " activeCleanupJobs:%d}", - totalCleanupCnt, - totalBrokerCleanupTombstoneCnt, - totalServiceUnitCleanupTombstoneCnt, + totalInactiveBrokerCleanupCnt, + totalServiceUnitTombstoneCleanupCnt, + totalOrphanServiceUnitCleanupCnt, totalCleanupErrorCnt.get(), - totalCleanupScheduledCnt, - totalCleanupIgnoredCnt, - totalCleanupCancelledCnt, + totalInactiveBrokerCleanupScheduledCnt, + totalInactiveBrokerCleanupIgnoredCnt, + totalInactiveBrokerCleanupCancelledCnt, cleanupJobs.size() ); } @@ -1127,15 +1209,6 @@ public List getMetrics() { } } - - { - var dim = new HashMap<>(dimensions); - dim.put("result", "Total"); - var metric = Metrics.create(dim); - metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupCnt); - metrics.add(metric); - } - { var dim = new HashMap<>(dimensions); dim.put("result", "Failure"); @@ -1148,7 +1221,7 @@ public List getMetrics() { var dim = new HashMap<>(dimensions); dim.put("result", "Skip"); var metric = Metrics.create(dim); - metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupIgnoredCnt); + metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", totalInactiveBrokerCleanupIgnoredCnt); metrics.add(metric); } @@ -1156,7 +1229,7 @@ public List getMetrics() { var dim = new HashMap<>(dimensions); dim.put("result", "Cancel"); var metric = Metrics.create(dim); - metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupCancelledCnt); + metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", totalInactiveBrokerCleanupCancelledCnt); metrics.add(metric); } @@ -1164,13 +1237,14 @@ public List getMetrics() { var dim = new HashMap<>(dimensions); dim.put("result", "Schedule"); var metric = Metrics.create(dim); - metric.put("brk_sunit_state_chn_cleanup_ops_total", totalCleanupScheduledCnt); + metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", totalInactiveBrokerCleanupScheduledCnt); metrics.add(metric); } var metric = Metrics.create(dimensions); - metric.put("brk_sunit_state_chn_broker_cleanup_ops_total", totalBrokerCleanupTombstoneCnt); - metric.put("brk_sunit_state_chn_su_cleanup_ops_total", totalServiceUnitCleanupTombstoneCnt); + metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", totalInactiveBrokerCleanupCnt); + metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", totalOrphanServiceUnitCleanupCnt); + metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", totalServiceUnitTombstoneCleanupCnt); metrics.add(metric); return metrics; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java index 0833569cf5f1a..1c3196df97c66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java @@ -20,7 +20,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Init; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Schema; @@ -48,8 +48,16 @@ public void checkBrokers(boolean check) { @Override public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { - ServiceUnitState prevState = from == null ? Init : from.state(); - ServiceUnitState state = to == null ? Init : to.state(); + if (to == null) { + return false; + } else if (to.force()) { + return false; + } + + + ServiceUnitState prevState = state(from); + ServiceUnitState state = state(to); + if (!ServiceUnitState.isValidTransition(prevState, state)) { return true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java index 6a7bf97b46b88..6a04431de64d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java @@ -27,7 +27,8 @@ * This data will be broadcast in ServiceUnitStateChannel. */ -public record ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker, long timestamp) { +public record ServiceUnitStateData( + ServiceUnitState state, String broker, String sourceBroker, boolean force, long timestamp) { public ServiceUnitStateData { Objects.requireNonNull(state); @@ -37,10 +38,18 @@ public record ServiceUnitStateData(ServiceUnitState state, String broker, String } public ServiceUnitStateData(ServiceUnitState state, String broker, String sourceBroker) { - this(state, broker, sourceBroker, System.currentTimeMillis()); + this(state, broker, sourceBroker, false, System.currentTimeMillis()); } public ServiceUnitStateData(ServiceUnitState state, String broker) { - this(state, broker, null, System.currentTimeMillis()); + this(state, broker, null, false, System.currentTimeMillis()); + } + + public ServiceUnitStateData(ServiceUnitState state, String broker, boolean force) { + this(state, broker, null, force, System.currentTimeMillis()); + } + + public static ServiceUnitState state(ServiceUnitStateData data) { + return data == null ? ServiceUnitState.Init : data.state(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 8d386371b6bf4..39756006b24bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -392,13 +392,15 @@ SplitDecision.Reason.Balanced, new MutableLong(6) } { - FieldUtils.writeDeclaredField(channel1, "totalCleanupCnt", 1, true); - FieldUtils.writeDeclaredField(channel1, "totalBrokerCleanupTombstoneCnt", 2, true); - FieldUtils.writeDeclaredField(channel1, "totalServiceUnitCleanupTombstoneCnt", 3, true); + + FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCnt", 1, true); + FieldUtils.writeDeclaredField(channel1, "totalServiceUnitTombstoneCleanupCnt", 2, true); + FieldUtils.writeDeclaredField(channel1, "totalOrphanServiceUnitCleanupCnt", 3, true); FieldUtils.writeDeclaredField(channel1, "totalCleanupErrorCnt", new AtomicLong(4), true); - FieldUtils.writeDeclaredField(channel1, "totalCleanupScheduledCnt", 5, true); - FieldUtils.writeDeclaredField(channel1, "totalCleanupIgnoredCnt", 6, true); - FieldUtils.writeDeclaredField(channel1, "totalCleanupCancelledCnt", 7, true); + FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupScheduledCnt", 5, true); + FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupIgnoredCnt", 6, true); + FieldUtils.writeDeclaredField(channel1, "totalInactiveBrokerCleanupCancelledCnt", 7, true); + Map ownerLookUpCounters = new LinkedHashMap<>(); Map handlerCounters = new LinkedHashMap<>(); Map eventCounters = @@ -431,9 +433,6 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, feature=max_ema, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=4.0}] dimensions=[{broker=localhost, feature=max, metric=loadBalancing}], metrics=[{brk_lb_resource_usage=0.04}] dimensions=[{broker=localhost, metric=bundleUnloading}], metrics=[{brk_lb_unload_broker_total=2, brk_lb_unload_bundle_total=3}] - dimensions=[{broker=localhost, metric=bundleUnloading, reason=Overloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=1}] - dimensions=[{broker=localhost, metric=bundleUnloading, reason=Underloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=2}] - dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Failure}], metrics=[{brk_lb_unload_broker_breakdown_total=10}] dimensions=[{broker=localhost, metric=bundleUnloading, reason=Balanced, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=3}] dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBundles, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=4}] dimensions=[{broker=localhost, metric=bundleUnloading, reason=CoolDown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=5}] @@ -441,6 +440,9 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoLoadData, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=7}] dimensions=[{broker=localhost, metric=bundleUnloading, reason=NoBrokers, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=8}] dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Skip}], metrics=[{brk_lb_unload_broker_breakdown_total=9}] + dimensions=[{broker=localhost, metric=bundleUnloading, reason=Overloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=1}] + dimensions=[{broker=localhost, metric=bundleUnloading, reason=Underloaded, result=Success}], metrics=[{brk_lb_unload_broker_breakdown_total=2}] + dimensions=[{broker=localhost, metric=bundleUnloading, reason=Unknown, result=Failure}], metrics=[{brk_lb_unload_broker_breakdown_total=10}] dimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=avg}], metrics=[{brk_lb_resource_usage_stats=1.5}] dimensions=[{broker=localhost, feature=max_ema, metric=bundleUnloading, stat=std}], metrics=[{brk_lb_resource_usage_stats=0.3}] dimensions=[{broker=localhost, metric=bundlesSplit}], metrics=[{brk_lb_bundles_split_total=35}] @@ -451,9 +453,9 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}] dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}] + dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}] dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}] dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}] - dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}] dimensions=[{broker=localhost, metric=sunitStateChn, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}] dimensions=[{broker=localhost, metric=sunitStateChn, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}] dimensions=[{broker=localhost, metric=sunitStateChn, state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}] @@ -481,12 +483,11 @@ SplitDecision.Reason.Balanced, new MutableLong(6) dimensions=[{broker=localhost, event=Splitting, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=12}] dimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=13}] dimensions=[{broker=localhost, event=Deleted, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_subscribe_ops_total=14}] - dimensions=[{broker=localhost, metric=sunitStateChn, result=Total}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=1}] dimensions=[{broker=localhost, metric=sunitStateChn, result=Failure}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=4}] - dimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=6}] - dimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=7}] - dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_cleanup_ops_total=5}] - dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_broker_cleanup_ops_total=2, brk_sunit_state_chn_su_cleanup_ops_total=3}] + dimensions=[{broker=localhost, metric=sunitStateChn, result=Skip}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}] + dimensions=[{broker=localhost, metric=sunitStateChn, result=Cancel}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}] + dimensions=[{broker=localhost, metric=sunitStateChn, result=Schedule}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}] + dimensions=[{broker=localhost, metric=sunitStateChn}], metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1, brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}] """.split("\n")); var actual = primaryLoadManager.getMetrics().stream().map(m -> m.toString()).collect(Collectors.toSet()); assertEquals(actual, expected); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 663c6e295c87d..255488f0bbc78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -29,6 +29,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; @@ -75,8 +76,10 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.Producer; @@ -108,8 +111,12 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private String bundle2; private PulsarTestContext additionalPulsarTestContext; + private LoadManagerContext loadManagerContext; + private BrokerRegistryImpl registry; + private BrokerSelectionStrategy brokerSelector; + @BeforeClass @Override protected void setup() throws Exception { @@ -124,10 +131,11 @@ protected void setup() throws Exception { pulsar1 = pulsar; registry = new BrokerRegistryImpl(pulsar); + loadManagerContext = mock(LoadManagerContext.class); + brokerSelector = mock(BrokerSelectionStrategy.class); additionalPulsarTestContext = createAdditionalPulsarTestContext(getDefaultConf()); pulsar2 = additionalPulsarTestContext.getPulsarService(); - channel1 = createChannel(pulsar1); channel1.start(); @@ -429,8 +437,7 @@ public void transferTest() @Test(priority = 5) public void transferTestWhenDestBrokerFails() - throws ExecutionException, InterruptedException, IllegalAccessException, PulsarServerException, - TimeoutException { + throws ExecutionException, InterruptedException, IllegalAccessException { var getOwnerRequests1 = getOwnerRequests(channel1); var getOwnerRequests2 = getOwnerRequests(channel2); @@ -483,9 +490,8 @@ public void transferTestWhenDestBrokerFails() assertEquals(0, getOwnerRequests1.size()); assertEquals(0, getOwnerRequests2.size()); - // recovered, check the monitor update state : Assigned -> Init + // recovered, check the monitor update state : Assigned -> Owned FieldUtils.writeDeclaredField(channel2, "producer", producer, true); - FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); FieldUtils.writeDeclaredField(channel2, @@ -495,8 +501,25 @@ public void transferTestWhenDestBrokerFails() List.of(lookupServiceAddress1, lookupServiceAddress2)); ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships( List.of(lookupServiceAddress1, lookupServiceAddress2)); - waitUntilState(channel1, bundle, Init); - waitUntilState(channel2, bundle, Init); + + + waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); + waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); + ownerAddr1 = channel1.getOwnerAsync(bundle).get(); + ownerAddr2 = channel2.getOwnerAsync(bundle).get(); + + assertEquals(ownerAddr1, ownerAddr2); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); + + var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + validateMonitorCounters(leader, + 0, + 0, + 1, + 0, + 0, + 0, + 0); FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); @@ -559,7 +582,7 @@ public void splitAndRetryTest() throws Exception { assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle2).get()); - // try the monitor and check the monitor moves `Disabled` -> `Init` + // try the monitor and check the monitor moves `Deleted` -> `Init` FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 1 , true); FieldUtils.writeDeclaredField(channel1, @@ -577,6 +600,16 @@ public void splitAndRetryTest() throws Exception { waitUntilState(channel1, bundle, Init); waitUntilState(channel2, bundle, Init); + var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + validateMonitorCounters(leader, + 0, + 1, + 0, + 0, + 0, + 0, + 0); + cleanTableView(channel1, childBundle1); cleanTableView(channel2, childBundle1); cleanTableView(channel1, childBundle2); @@ -671,6 +704,7 @@ public void handleBrokerDeletionEventTest() var owner1 = channel1.getOwnerAsync(bundle1); var owner2 = channel2.getOwnerAsync(bundle2); + doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), any(), any()); assertTrue(owner1.get().isEmpty()); assertTrue(owner2.get().isEmpty()); @@ -693,26 +727,30 @@ public void handleBrokerDeletionEventTest() leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); followerChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); - waitUntilNewOwner(channel1, bundle1, null); - waitUntilNewOwner(channel2, bundle1, null); - waitUntilNewOwner(channel1, bundle2, null); - waitUntilNewOwner(channel2, bundle2, null); + waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2); + waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2); + waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2); + waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2); verify(leaderCleanupJobs, times(1)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); + + assertEquals(0, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 1, + 0, + 2, + 0, + 1, + 0, + 0); + // test jittery metadata state - channel1.publishAssignEventAsync(bundle1, broker); - channel2.publishAssignEventAsync(bundle2, broker); + channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker))); + channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker))); waitUntilNewOwner(channel1, bundle1, broker); waitUntilNewOwner(channel2, bundle1, broker); waitUntilNewOwner(channel1, bundle2, broker); @@ -727,13 +765,14 @@ public void handleBrokerDeletionEventTest() verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); assertEquals(1, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 1, + 0, + 2, + 0, + 2, + 0, + 0); // broker is back online leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Created); @@ -743,13 +782,14 @@ public void handleBrokerDeletionEventTest() verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); assertEquals(0, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 1, + 0, + 2, + 0, + 2, + 0, + 1); // broker is offline again @@ -761,35 +801,37 @@ public void handleBrokerDeletionEventTest() verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); assertEquals(1, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 1, + 0, + 2, + 0, + 3, + 0, + 1); // finally cleanup - waitUntilNewOwner(channel1, bundle1, null); - waitUntilNewOwner(channel2, bundle1, null); - waitUntilNewOwner(channel1, bundle2, null); - waitUntilNewOwner(channel2, bundle2, null); + waitUntilNewOwner(channel1, bundle1, lookupServiceAddress2); + waitUntilNewOwner(channel2, bundle1, lookupServiceAddress2); + waitUntilNewOwner(channel1, bundle2, lookupServiceAddress2); + waitUntilNewOwner(channel2, bundle2, lookupServiceAddress2); verify(leaderCleanupJobs, times(3)).computeIfAbsent(eq(broker), any()); verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); assertEquals(0, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 2, + 0, + 4, + 0, + 3, + 0, + 1); // test unstable state - channel1.publishAssignEventAsync(bundle1, broker); - channel2.publishAssignEventAsync(bundle2, broker); + channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle1, Optional.of(broker))); + channel1.publishUnloadEventAsync(new Unload(lookupServiceAddress2, bundle2, Optional.of(broker))); waitUntilNewOwner(channel1, bundle1, broker); waitUntilNewOwner(channel2, bundle1, broker); waitUntilNewOwner(channel1, bundle2, broker); @@ -804,13 +846,14 @@ public void handleBrokerDeletionEventTest() verify(followerCleanupJobs, times(0)).computeIfAbsent(eq(broker), any()); assertEquals(0, leaderCleanupJobs.size()); assertEquals(0, followerCleanupJobs.size()); - assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); - assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); - assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); - assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); - assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); + validateMonitorCounters(leaderChannel, + 2, + 0, + 4, + 0, + 3, + 1, + 1); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -969,6 +1012,17 @@ public void unloadTest() waitUntilState(channel1, bundle, Init); waitUntilState(channel2, bundle, Init); + var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + validateMonitorCounters(leader, + 0, + 1, + 0, + 0, + 0, + 0, + 0); + + FieldUtils.writeDeclaredField(channel1, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); FieldUtils.writeDeclaredField(channel1, @@ -980,6 +1034,169 @@ public void unloadTest() "semiTerminalStateWaitingTimeInMillis", 300 * 1000, true); } + @Test(priority = 13) + public void assignTestWhenDestBrokerFails() + throws ExecutionException, InterruptedException, IllegalAccessException { + + Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty()); + + channel1.publishUnloadEventAsync(unload); + + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); + + assertEquals(Optional.empty(), channel1.getOwnerAsync(bundle).get()); + assertEquals(Optional.empty(), channel2.getOwnerAsync(bundle).get()); + + var producer = (Producer) FieldUtils.readDeclaredField(channel1, + "producer", true); + var spyProducer = spy(producer); + var msg = mock(TypedMessageBuilder.class); + var future = CompletableFuture.failedFuture(new RuntimeException()); + doReturn(msg).when(spyProducer).newMessage(); + doReturn(msg).when(msg).key(any()); + doReturn(msg).when(msg).value(any()); + doReturn(future).when(msg).sendAsync(); + FieldUtils.writeDeclaredField(channel2, "producer", spyProducer, true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), any(), any()); + channel1.publishAssignEventAsync(bundle, lookupServiceAddress2); + // channel1 is broken. the assign won't be complete. + waitUntilState(channel1, bundle); + waitUntilState(channel2, bundle); + var owner1 = channel1.getOwnerAsync(bundle); + var owner2 = channel2.getOwnerAsync(bundle); + + assertFalse(owner1.isDone()); + assertFalse(owner2.isDone()); + + // In 5 secs, the getOwnerAsync requests(lookup requests) should time out. + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally())); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally())); + + // recovered, check the monitor update state : Assigned -> Owned + FieldUtils.writeDeclaredField(channel2, "producer", producer, true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 1 , true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 1 , true); + + ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships( + List.of(lookupServiceAddress1, lookupServiceAddress2)); + ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships( + List.of(lookupServiceAddress1, lookupServiceAddress2)); + + + waitUntilNewOwner(channel1, bundle, lookupServiceAddress2); + waitUntilNewOwner(channel2, bundle, lookupServiceAddress2); + var ownerAddr1 = channel1.getOwnerAsync(bundle).get(); + var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); + + assertEquals(ownerAddr1, ownerAddr2); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2)); + + var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + validateMonitorCounters(leader, + 0, + 0, + 1, + 0, + 0, + 0, + 0); + + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + + } + + @Test(priority = 14) + public void splitTestWhenDestBrokerFails() + throws ExecutionException, InterruptedException, IllegalAccessException { + + + Unload unload = new Unload(lookupServiceAddress1, bundle, Optional.empty()); + + channel1.publishUnloadEventAsync(unload); + + waitUntilState(channel1, bundle, Free); + waitUntilState(channel2, bundle, Free); + + channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); + + waitUntilState(channel1, bundle, Owned); + waitUntilState(channel2, bundle, Owned); + + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get()); + assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get().get()); + + var producer = (Producer) FieldUtils.readDeclaredField(channel1, + "producer", true); + var spyProducer = spy(producer); + var msg = mock(TypedMessageBuilder.class); + var future = CompletableFuture.failedFuture(new RuntimeException()); + doReturn(msg).when(spyProducer).newMessage(); + doReturn(msg).when(msg).key(any()); + doReturn(msg).when(msg).value(any()); + doReturn(future).when(msg).sendAsync(); + FieldUtils.writeDeclaredField(channel1, "producer", spyProducer, true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + channel2.publishSplitEventAsync(new Split(bundle, lookupServiceAddress1, null)); + // channel1 is broken. the split won't be complete. + waitUntilState(channel1, bundle); + waitUntilState(channel2, bundle); + var owner1 = channel1.getOwnerAsync(bundle); + var owner2 = channel2.getOwnerAsync(bundle); + + + // recovered, check the monitor update state : Splitting -> Owned + FieldUtils.writeDeclaredField(channel1, "producer", producer, true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 1 , true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 1 , true); + + ((ServiceUnitStateChannelImpl) channel1).monitorOwnerships( + List.of(lookupServiceAddress1, lookupServiceAddress2)); + ((ServiceUnitStateChannelImpl) channel2).monitorOwnerships( + List.of(lookupServiceAddress1, lookupServiceAddress2)); + + + waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); + waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); + var ownerAddr1 = channel1.getOwnerAsync(bundle).get(); + var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); + + assertEquals(ownerAddr1, ownerAddr2); + assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1)); + + var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; + validateMonitorCounters(leader, + 0, + 0, + 1, + 0, + 0, + 0, + 0); + + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + + } + private static ConcurrentOpenHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { return (ConcurrentOpenHashMap>>) @@ -1076,7 +1293,7 @@ private static void waitUntilState(ServiceUnitStateChannel channel, String key, .atMost(10, TimeUnit.SECONDS) .until(() -> { // wait until true ServiceUnitStateData data = tv.get(key); - ServiceUnitState actual = data == null ? Init : data.state(); + ServiceUnitState actual = state(data); return actual == expected; }); } @@ -1133,13 +1350,13 @@ private static void cleanOpsCounters(ServiceUnitStateChannel channel) } private void cleanOwnershipMonitorCounters(ServiceUnitStateChannel channel) throws IllegalAccessException { - FieldUtils.writeDeclaredField(channel, "totalCleanupCnt", 0, true); - FieldUtils.writeDeclaredField(channel, "totalBrokerCleanupTombstoneCnt", 0, true); - FieldUtils.writeDeclaredField(channel, "totalServiceUnitCleanupTombstoneCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalServiceUnitTombstoneCleanupCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalOrphanServiceUnitCleanupCnt", 0, true); FieldUtils.writeDeclaredField(channel, "totalCleanupErrorCnt", new AtomicLong(0), true); - FieldUtils.writeDeclaredField(channel, "totalCleanupScheduledCnt", 0, true); - FieldUtils.writeDeclaredField(channel, "totalCleanupIgnoredCnt", 0, true); - FieldUtils.writeDeclaredField(channel, "totalCleanupCancelledCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupScheduledCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupIgnoredCnt", 0, true); + FieldUtils.writeDeclaredField(channel, "totalInactiveBrokerCleanupCancelledCnt", 0, true); } private static long getCleanupMetric(ServiceUnitStateChannel channel, String metric) @@ -1236,13 +1453,38 @@ private static void validateOwnerLookUpCounters(ServiceUnitStateChannel channel, }); } + private static void validateMonitorCounters(ServiceUnitStateChannel channel, + long totalInactiveBrokerCleanupCnt, + long totalServiceUnitTombstoneCleanupCnt, + long totalOrphanServiceUnitCleanupCnt, + long totalCleanupErrorCnt, + long totalInactiveBrokerCleanupScheduledCnt, + long totalInactiveBrokerCleanupIgnoredCnt, + long totalInactiveBrokerCleanupCancelledCnt) + throws IllegalAccessException { + assertEquals(totalInactiveBrokerCleanupCnt, getCleanupMetric(channel, "totalInactiveBrokerCleanupCnt")); + assertEquals(totalServiceUnitTombstoneCleanupCnt, + getCleanupMetric(channel, "totalServiceUnitTombstoneCleanupCnt")); + assertEquals(totalOrphanServiceUnitCleanupCnt, getCleanupMetric(channel, "totalOrphanServiceUnitCleanupCnt")); + assertEquals(totalCleanupErrorCnt, getCleanupMetric(channel, "totalCleanupErrorCnt")); + assertEquals(totalInactiveBrokerCleanupScheduledCnt, + getCleanupMetric(channel, "totalInactiveBrokerCleanupScheduledCnt")); + assertEquals(totalInactiveBrokerCleanupIgnoredCnt, + getCleanupMetric(channel, "totalInactiveBrokerCleanupIgnoredCnt")); + assertEquals(totalInactiveBrokerCleanupCancelledCnt, + getCleanupMetric(channel, "totalInactiveBrokerCleanupCancelledCnt")); + } + ServiceUnitStateChannelImpl createChannel(PulsarService pulsar) throws IllegalAccessException { var tmpChannel = new ServiceUnitStateChannelImpl(pulsar); FieldUtils.writeDeclaredField(tmpChannel, "ownershipMonitorDelayTimeInSecs", 5, true); var channel = spy(tmpChannel); + doReturn(loadManagerContext).when(channel).getContext(); doReturn(registry).when(channel).getBrokerRegistry(); + doReturn(brokerSelector).when(channel).getBrokerSelector(); + return channel; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java index 44335e00f98f4..4a5ac391a917c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java @@ -48,7 +48,15 @@ ServiceUnitStateData data(ServiceUnitState state, String src, String dst) { public void test() throws InterruptedException { String dst = "dst"; String src = "src"; - assertFalse(strategy.shouldKeepLeft(data(Init), data(Init))); + + assertFalse(strategy.shouldKeepLeft( + new ServiceUnitStateData(Init, dst), + new ServiceUnitStateData(Init, dst, true))); + + assertFalse(strategy.shouldKeepLeft( + data(Owned), null)); + + assertTrue(strategy.shouldKeepLeft(data(Init), data(Init))); assertFalse(strategy.shouldKeepLeft(data(Init), data(Free))); assertFalse(strategy.shouldKeepLeft(data(Init), data(Assigned))); assertFalse(strategy.shouldKeepLeft(data(Init), data(Owned))); @@ -56,7 +64,7 @@ public void test() throws InterruptedException { assertFalse(strategy.shouldKeepLeft(data(Init), data(Splitting))); assertFalse(strategy.shouldKeepLeft(data(Init), data(Deleted))); - assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Init))); + assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Init))); assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Free))); assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Assigned))); assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst1"), data(Owned, "dst2"))); @@ -69,7 +77,7 @@ public void test() throws InterruptedException { assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Splitting, dst))); assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Deleted, dst))); - assertFalse(strategy.shouldKeepLeft(data(Owned), data(Init))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Init))); assertTrue(strategy.shouldKeepLeft(data(Owned), data(Free))); assertTrue(strategy.shouldKeepLeft(data(Owned, src, "dst1"), data(Assigned, src, "dst2"))); assertTrue(strategy.shouldKeepLeft(data(Owned, src, dst), data(Assigned, dst))); @@ -88,7 +96,7 @@ public void test() throws InterruptedException { assertFalse(strategy.shouldKeepLeft(data(Owned, src, dst), data(Splitting, dst))); assertTrue(strategy.shouldKeepLeft(data(Owned), data(Deleted, dst))); - assertFalse(strategy.shouldKeepLeft(data(Released), data(Init))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Init))); assertFalse(strategy.shouldKeepLeft(data(Released), data(Free))); assertTrue(strategy.shouldKeepLeft(data(Released, "dst1"), data(Free, "dst2"))); assertTrue(strategy.shouldKeepLeft(data(Released, "src1", dst), data(Free, "src2", dst))); @@ -101,7 +109,7 @@ public void test() throws InterruptedException { assertTrue(strategy.shouldKeepLeft(data(Released), data(Splitting))); assertTrue(strategy.shouldKeepLeft(data(Released), data(Deleted, dst))); - assertFalse(strategy.shouldKeepLeft(data(Splitting), data(Init))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Init))); assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Free))); assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigned))); assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned))); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java index 77b6c97508ade..1a913cdbaabee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java @@ -35,7 +35,7 @@ public class ServiceUnitStateTest { @Test public void testTransitions() { - assertTrue(ServiceUnitState.isValidTransition(Init, Init)); + assertFalse(ServiceUnitState.isValidTransition(Init, Init)); assertTrue(ServiceUnitState.isValidTransition(Init, Free)); assertTrue(ServiceUnitState.isValidTransition(Init, Owned)); assertTrue(ServiceUnitState.isValidTransition(Init, Assigned)); @@ -51,7 +51,7 @@ public void testTransitions() { assertFalse(ServiceUnitState.isValidTransition(Free, Splitting)); assertFalse(ServiceUnitState.isValidTransition(Free, Deleted)); - assertTrue(ServiceUnitState.isValidTransition(Assigned, Init)); + assertFalse(ServiceUnitState.isValidTransition(Assigned, Init)); assertFalse(ServiceUnitState.isValidTransition(Assigned, Free)); assertFalse(ServiceUnitState.isValidTransition(Assigned, Assigned)); assertTrue(ServiceUnitState.isValidTransition(Assigned, Owned)); @@ -59,7 +59,7 @@ public void testTransitions() { assertFalse(ServiceUnitState.isValidTransition(Assigned, Splitting)); assertFalse(ServiceUnitState.isValidTransition(Assigned, Deleted)); - assertTrue(ServiceUnitState.isValidTransition(Owned, Init)); + assertFalse(ServiceUnitState.isValidTransition(Owned, Init)); assertFalse(ServiceUnitState.isValidTransition(Owned, Free)); assertTrue(ServiceUnitState.isValidTransition(Owned, Assigned)); assertFalse(ServiceUnitState.isValidTransition(Owned, Owned)); @@ -67,7 +67,7 @@ public void testTransitions() { assertTrue(ServiceUnitState.isValidTransition(Owned, Splitting)); assertFalse(ServiceUnitState.isValidTransition(Owned, Deleted)); - assertTrue(ServiceUnitState.isValidTransition(Released, Init)); + assertFalse(ServiceUnitState.isValidTransition(Released, Init)); assertTrue(ServiceUnitState.isValidTransition(Released, Free)); assertFalse(ServiceUnitState.isValidTransition(Released, Assigned)); assertTrue(ServiceUnitState.isValidTransition(Released, Owned)); @@ -75,7 +75,7 @@ public void testTransitions() { assertFalse(ServiceUnitState.isValidTransition(Released, Splitting)); assertFalse(ServiceUnitState.isValidTransition(Released, Deleted)); - assertTrue(ServiceUnitState.isValidTransition(Splitting, Init)); + assertFalse(ServiceUnitState.isValidTransition(Splitting, Init)); assertFalse(ServiceUnitState.isValidTransition(Splitting, Free)); assertFalse(ServiceUnitState.isValidTransition(Splitting, Assigned)); assertFalse(ServiceUnitState.isValidTransition(Splitting, Owned)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java index 8906d8313d9fb..61299bc79dabf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java @@ -25,6 +25,7 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -196,10 +197,21 @@ TestData generateTestData() throws PulsarAdminException, PulsarClientException { int keyIndex = r.nextInt(maxKeys); String key = "key" + keyIndex; ServiceUnitStateData prev = expected.get(key); - ServiceUnitState prevState = prev == null ? Init : prev.state(); - ServiceUnitState state = r.nextBoolean() ? nextInvalidState(prevState) : + ServiceUnitState prevState = state(prev); + boolean invalid = r.nextBoolean(); + ServiceUnitState state = invalid ? nextInvalidState(prevState) : nextValidState(prevState); - ServiceUnitStateData value = new ServiceUnitStateData(state, key + ":" + j); + ServiceUnitStateData value; + if (invalid) { + value = new ServiceUnitStateData(state, key + ":" + j, false); + } else { + if (state == Init) { + value = new ServiceUnitStateData(state, key + ":" + j, true); + } else { + value = new ServiceUnitStateData(state, key + ":" + j, false); + } + } + producer.newMessage().key(key).value(value).send(); if (!strategy.shouldKeepLeft(prev, value)) { expected.put(key, value); @@ -719,9 +731,9 @@ public void testCompactionWithLastDeletedKey() throws Exception { pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); - producer.newMessage().key("1").value(testValue(Owned, "1")).send(); - producer.newMessage().key("2").value(testValue(Owned, "3")).send(); - producer.newMessage().key("3").value(testValue(Owned, "5")).send(); + producer.newMessage().key("1").value(testValue("1")).send(); + producer.newMessage().key("2").value(testValue("3")).send(); + producer.newMessage().key("3").value(testValue( "5")).send(); producer.newMessage().key("1").value(null).send(); producer.newMessage().key("2").value(null).send();