diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d5cf6a3e74d0e..5446060ac6502 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1028,7 +1028,9 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, - getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)); + pulsar().getNamespaceService() + .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), + splitBoundaries)); }); } @@ -1109,18 +1111,6 @@ private CompletableFuture findHotBundleAsync(NamespaceName name .getBundleWithHighestThroughputAsync(namespaceName); } - private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { - NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); - if (algorithm == null) { - algorithm = NamespaceBundleSplitAlgorithm.of( - pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); - } - if (algorithm == null) { - algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; - } - return algorithm; - } - protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { validateSuperUserAccess(); log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index d5bcd3e1436cb..d10138bda6805 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; @@ -35,7 +37,9 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,6 +52,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import lombok.AllArgsConstructor; import lombok.Getter; @@ -60,18 +65,23 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; -import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; +import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -523,8 +533,7 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.broker())) { - splitServiceUnit(serviceUnit) - .thenCompose(__ -> tombstoneAsync(serviceUnit)) + splitServiceUnit(serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } } @@ -625,25 +634,107 @@ private CompletableFuture closeServiceUnit(String serviceUnit) { }); } - private CompletableFuture splitServiceUnit(String serviceUnit) { - // TODO: after the split we need to write the child ownerships to BSC instead of ZK. + private CompletableFuture splitServiceUnit(String serviceUnit, ServiceUnitStateData data) { + // Write the child ownerships to BSC. long startTime = System.nanoTime(); - return pulsar.getNamespaceService() - .splitAndOwnBundle(getNamespaceBundle(serviceUnit), - false, - NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()), - null) - .whenComplete((__, ex) -> { - double splitBundleTime = TimeUnit.NANOSECONDS - .toMillis((System.nanoTime() - startTime)); - if (ex == null) { - log.info("Successfully split {} namespace-bundle in {} ms", - serviceUnit, splitBundleTime); - } else { - log.error("Failed to split {} namespace-bundle in {} ms", - serviceUnit, splitBundleTime, ex); - } - }); + NamespaceService namespaceService = pulsar.getNamespaceService(); + NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory(); + NamespaceBundle bundle = getNamespaceBundle(serviceUnit); + CompletableFuture completionFuture = new CompletableFuture<>(); + final AtomicInteger counter = new AtomicInteger(0); + this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, bundle, serviceUnit, data, + counter, startTime, completionFuture); + return completionFuture; + } + + @VisibleForTesting + protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, + NamespaceBundleFactory bundleFactory, + NamespaceBundle bundle, + String serviceUnit, + ServiceUnitStateData data, + AtomicInteger counter, + long startTime, + CompletableFuture completionFuture) { + CompletableFuture> updateFuture = new CompletableFuture<>(); + + pulsar.getNamespaceService().getSplitBoundary(bundle, null).thenAccept(splitBundlesPair -> { + // Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper. + if (splitBundlesPair == null) { + String msg = format("Bundle %s not found under namespace", serviceUnit); + updateFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); + return; + } + ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker()); + NamespaceBundles targetNsBundle = splitBundlesPair.getLeft(); + List splitBundles = Collections.unmodifiableList(splitBundlesPair.getRight()); + List successPublishedBundles = + Collections.synchronizedList(new ArrayList<>(splitBundles.size())); + List> futures = new ArrayList<>(splitBundles.size()); + for (NamespaceBundle sBundle : splitBundles) { + futures.add(pubAsync(sBundle.toString(), next).thenAccept(__ -> successPublishedBundles.add(sBundle))); + } + NamespaceName nsname = bundle.getNamespaceObject(); + FutureUtil.waitForAll(futures) + .thenCompose(__ -> namespaceService.updateNamespaceBundles(nsname, targetNsBundle)) + .thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(nsname, targetNsBundle)) + .thenRun(() -> { + bundleFactory.invalidateBundleCache(bundle.getNamespaceObject()); + updateFuture.complete(splitBundles); + }).exceptionally(e -> { + // Clean the new bundle when has exception. + List> futureList = new ArrayList<>(); + for (NamespaceBundle sBundle : successPublishedBundles) { + futureList.add(tombstoneAsync(sBundle.toString()).thenAccept(__ -> {})); + } + FutureUtil.waitForAll(futureList) + .whenComplete((__, ex) -> { + if (ex != null) { + log.warn("Clean new bundles failed,", ex); + } + updateFuture.completeExceptionally(e); + }); + return null; + }); + }).exceptionally(e -> { + updateFuture.completeExceptionally(e); + return null; + }); + + updateFuture.thenAccept(r -> { + // Free the old bundle + tombstoneAsync(serviceUnit).thenRun(() -> { + // Update bundled_topic cache for load-report-generation + pulsar.getBrokerService().refreshTopicToStatsMaps(bundle); + // TODO: Update the load data immediately if needed. + completionFuture.complete(null); + double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); + log.info("Successfully split {} parent namespace-bundle to {} in {} ms", serviceUnit, r, + splitBundleTime); + }).exceptionally(e -> { + double splitBundleTime = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)); + String msg = format("Failed to free bundle %s in %s ms, under namespace [%s] with error %s", + bundle.getNamespaceObject().toString(), splitBundleTime, bundle, e.getMessage()); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); + return null; + }); + }).exceptionally(ex -> { + // Retry several times on BadVersion + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if ((throwable instanceof MetadataStoreException.BadVersionException) + && (counter.incrementAndGet() < NamespaceService.BUNDLE_SPLIT_RETRY_LIMIT)) { + pulsar.getExecutor().schedule(() -> splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, + bundle, serviceUnit, data, counter, startTime, completionFuture), 100, MILLISECONDS); + } else if (throwable instanceof IllegalArgumentException) { + completionFuture.completeExceptionally(throwable); + } else { + // Retry enough, or meet other exception + String msg = format("Bundle: %s not success update nsBundles, counter %d, reason %s", + bundle.toString(), counter.get(), throwable.getMessage()); + completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg)); + } + return null; + }); } public void handleMetadataSessionEvent(SessionEvent e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index abbabcd3b00a1..245c3f896af1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -124,7 +124,7 @@ public class NamespaceService implements AutoCloseable { private final NamespaceBundleFactory bundleFactory; private final String host; - private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7; + public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7; public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor"; public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)"); @@ -828,18 +828,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, CompletableFuture completionFuture, NamespaceBundleSplitAlgorithm splitAlgorithm, List boundaries) { - BundleSplitOption bundleSplitOption; - if (config.getDefaultNamespaceBundleSplitAlgorithm() - .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { - Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); - bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, - topicStatsMap, - config.getLoadBalancerNamespaceBundleMaxMsgRate(), - config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), - config.getFlowOrQpsDifferenceThresholdPercentage()); - } else { - bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); - } + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> { CompletableFuture> updateFuture = new CompletableFuture<>(); @@ -957,6 +946,61 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, }); } + /** + * Get the split boundary's. + * + * @param bundle The bundle to split. + * @param boundaries The specified positions, + * use for {@link org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}. + * @return A pair, left is target namespace bundle, right is split bundles. + */ + public CompletableFuture>> getSplitBoundary( + NamespaceBundle bundle, List boundaries) { + BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config); + NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = + getNamespaceBundleSplitAlgorithmByName(config.getDefaultNamespaceBundleSplitAlgorithm()); + CompletableFuture> splitBoundary = + nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption); + return splitBoundary.thenCompose(splitBoundaries -> { + if (splitBoundaries == null || splitBoundaries.size() == 0) { + LOG.info("[{}] No valid boundary found in {} to split bundle {}", + bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange()); + return CompletableFuture.completedFuture(null); + } + return pulsar.getNamespaceService().getNamespaceBundleFactory() + .splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries); + }); + } + + private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle, + List boundaries, + ServiceConfiguration config) { + BundleSplitOption bundleSplitOption; + if (config.getDefaultNamespaceBundleSplitAlgorithm() + .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) { + Map topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle); + bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries, + topicStatsMap, + config.getLoadBalancerNamespaceBundleMaxMsgRate(), + config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(), + config.getFlowOrQpsDifferenceThresholdPercentage()); + } else { + bundleSplitOption = new BundleSplitOption(this, bundle, boundaries); + } + return bundleSplitOption; + } + + public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { + NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm()); + } + if (algorithm == null) { + algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; + } + return algorithm; + } + /** * Update new bundle-range to admin/policies/namespace. * Update may fail because of concurrent write to Zookeeper. @@ -965,7 +1009,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle, * @param nsBundles * @throws Exception */ - private CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName nsname, + public CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName nsname, NamespaceBundles nsBundles) { Objects.requireNonNull(nsname); Objects.requireNonNull(nsBundles); @@ -994,7 +1038,7 @@ private CompletableFuture updateNamespaceBundlesForPolicies(NamespaceName * @param nsBundles * @throws Exception */ - private CompletableFuture updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { + public CompletableFuture updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { Objects.requireNonNull(nsname); Objects.requireNonNull(nsBundles); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 660999365c428..327afa3cb8891 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -38,9 +38,12 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -61,6 +64,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -69,12 +73,14 @@ import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.awaitility.Awaitility; @@ -113,9 +119,9 @@ protected void setup() throws Exception { pulsar1 = pulsar; additionalPulsarTestContext = createAdditionalPulsarTestContext(getDefaultConf()); pulsar2 = additionalPulsarTestContext.getPulsarService(); - channel1 = new ServiceUnitStateChannelImpl(pulsar1); + channel1 = spy(new ServiceUnitStateChannelImpl(pulsar1)); channel1.start(); - channel2 = new ServiceUnitStateChannelImpl(pulsar2); + channel2 = spy(new ServiceUnitStateChannelImpl(pulsar2)); channel2.start(); lookupServiceAddress1 = (String) FieldUtils.readDeclaredField(channel1, "lookupServiceAddress", true); @@ -480,7 +486,7 @@ public void unloadTestWhenDestBrokerFails() } @Test(priority = 6) - public void splitTest() throws Exception { + public void splitAndRetryTest() throws Exception { channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); @@ -490,17 +496,52 @@ public void splitTest() throws Exception { assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1)); assertTrue(ownerAddr1.isPresent()); + NamespaceService namespaceService = spy(pulsar1.getNamespaceService()); + CompletableFuture future = new CompletableFuture<>(); + int badVersionExceptionCount = 3; + AtomicInteger count = new AtomicInteger(badVersionExceptionCount); + future.completeExceptionally(new MetadataStoreException.BadVersionException("BadVersion")); + doAnswer(invocationOnMock -> { + if (count.decrementAndGet() > 0) { + return future; + } + // Call the real method + reset(namespaceService); + return future; + }).when(namespaceService).updateNamespaceBundles(any(), any()); + doReturn(namespaceService).when(pulsar1).getNamespaceService(); + Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>()); channel1.publishSplitEventAsync(split); waitUntilNewOwner(channel1, bundle, null); waitUntilNewOwner(channel2, bundle, null); - // TODO: assert child bundle ownerships in the channels. - validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0); + validateHandlerCounters(channel2, 1, 0, 9, 0, 0, 0, 1, 0, 7, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); + // Verify the retry count + verify(((ServiceUnitStateChannelImpl) channel1), times(badVersionExceptionCount + 1)) + .splitServiceUnitOnceAndRetry(any(), any(), any(), any(), any(), any(), anyLong(), any()); + + // Assert child bundle ownerships in the channels. + String childBundle1 = "public/default/0x7fffffff_0xffffffff"; + String childBundle2 = "public/default/0x00000000_0x7fffffff"; + + waitUntilNewOwner(channel1, childBundle1, lookupServiceAddress1); + waitUntilNewOwner(channel1, childBundle2, lookupServiceAddress1); + waitUntilNewOwner(channel2, childBundle1, lookupServiceAddress1); + waitUntilNewOwner(channel2, childBundle2, lookupServiceAddress1); + assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle1).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel1.getOwnerAsync(childBundle2).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle1).get()); + assertEquals(Optional.of(lookupServiceAddress1), channel2.getOwnerAsync(childBundle2).get()); + + cleanTableView(channel1, childBundle1); + cleanTableView(channel2, childBundle1); + cleanTableView(channel1, childBundle2); + cleanTableView(channel2, childBundle2); } @Test(priority = 7)