From 3def245ff919446043fee083f6393493198b3cf6 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 24 Sep 2021 10:15:21 +0800 Subject: [PATCH 1/3] Support for setting geo-replication clusters on topic level --- .../broker/admin/impl/NamespacesBase.java | 29 ------- .../admin/impl/PersistentTopicsBase.java | 55 +++++++++++++ .../broker/admin/v2/PersistentTopics.java | 81 +++++++++++++++++++ .../service/persistent/PersistentTopic.java | 44 +++++----- .../pulsar/broker/web/PulsarWebResource.java | 29 +++++++ .../broker/admin/TopicPoliciesTest.java | 20 +++++ .../apache/pulsar/client/admin/Topics.java | 58 +++++++++++++ .../client/admin/internal/TopicsImpl.java | 77 ++++++++++++++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 48 +++++++++++ .../common/policies/data/TopicPolicies.java | 10 +-- 10 files changed, 396 insertions(+), 55 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 94812dc5a7498..70e6dc2e4a692 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -24,7 +24,6 @@ import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.lang.reflect.Field; import java.net.URI; import java.net.URL; @@ -1953,34 +1952,6 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr } } - /** - * It validates that peer-clusters can't coexist in replication-clusters. - * - * @param clusterName: given cluster whose peer-clusters can't be present into replication-cluster list - * @param replicationClusters: replication-cluster list - */ - private void validatePeerClusterConflict(String clusterName, Set replicationClusters) { - try { - ClusterData clusterData = clusterResources().getCluster(clusterName).orElseThrow( - () -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName)); - Set peerClusters = clusterData.getPeerClusterNames(); - if (peerClusters != null && !peerClusters.isEmpty()) { - SetView conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters); - if (!conflictPeerClusters.isEmpty()) { - log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(), - clusterName, conflictPeerClusters); - throw new RestException(Status.CONFLICT, - String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName, - conflictPeerClusters, replicationClusters)); - } - } - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), clusterName, e); - } - } - protected BundlesData validateBundlesData(BundlesData initialBundles) { SortedSet partitions = new TreeSet(); for (String partition : initialBundles.getBoundaries()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9000f3d8b5ea0..8483a44fdc4a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -2708,6 +2709,60 @@ protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQu }); } + protected CompletableFuture internalSetReplicationClusters(List clusterIds) { + validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + + Set replicationClusters = Sets.newHashSet(clusterIds); + if (replicationClusters.contains("global")) { + throw new RestException(Status.PRECONDITION_FAILED, + "Cannot specify global in the list of replication clusters"); + } + Set clusters = clusters(); + for (String clusterId : replicationClusters) { + if (!clusters.contains(clusterId)) { + throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId); + } + validatePeerClusterConflict(clusterId, replicationClusters); + validateClusterForTenant(namespaceName.getTenant(), clusterId); + } + + return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicationClusters(replicationClusters); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) + .thenRun(() -> { + log.info("[{}] Successfully set replication clusters for namespace={}, " + + "topic={}, clusters={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + topicPolicies.getReplicationClusters()); + }); + } + ); + } + + protected CompletableFuture internalRemoveReplicationClusters() { + validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + + return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicationClusters(Collections.emptySet()); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) + .thenRun(() -> { + log.info("[{}] Successfully set replication clusters for namespace={}, " + + "topic={}, clusters={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + topicPolicies.getReplicationClusters()); + }); + } + ); + } + protected CompletableFuture internalGetDeduplication(boolean applied) { return getTopicPoliciesAsyncWithRetry(topicName) .thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1e26907cf6c43..70e6e2fae4179 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1651,6 +1651,87 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/replication") + @ApiOperation(value = "Get the replication clusters for a topic") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = + "Topic level policy is disabled, enable the topic level policy and retry")}) + public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) + .thenAccept(op -> { + asyncResponse.resume(op.map(TopicPolicies::getReplicationClusters).orElseGet(() -> { + if (applied) { + return getNamespacePolicies(namespaceName).replication_clusters; + } + return null; + })); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getReplicationClusters", ex, asyncResponse); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/replication") + @ApiOperation(value = "Set the replication clusters for a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 412, message = "Topic is not global or invalid cluster ids")}) + public void setReplicationClusters( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "List of replication clusters", required = true) List clusterIds) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetReplicationClusters(clusterIds)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setReplicationClusters", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/replication") + @ApiOperation(value = "Remove the replication clusters from a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, to enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeReplicationClusters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveReplicationClusters()) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("removeReplicationClusters", ex, asyncResponse); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/messageTTL") @ApiOperation(value = "Get message TTL in seconds for a topic") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f403a87ca130..7f68d5c6a944c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -72,6 +72,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieId; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1334,33 +1335,36 @@ public CompletableFuture checkReplication() { log.debug("[{}] Checking replication status", name); } - CompletableFuture policiesFuture = brokerService.pulsar().getPulsarResources() - .getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenCompose(optPolicies -> { - if (!optPolicies.isPresent()) { - return FutureUtil.failedFuture( - new ServerMetadataException( - new MetadataStoreException.NotFoundException())); - } + CompletableFuture> replicationClustersFuture = brokerService.pulsar() + .getTopicPoliciesService() + .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor()) + .thenCompose(topicPolicies -> { + if (!topicPolicies.isPresent() + || CollectionUtils.isEmpty(topicPolicies.get().getReplicationClusters())) { + return brokerService.pulsar().getPulsarResources() + .getNamespaceResources() + .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) + .thenCompose(optPolicies -> { + if (!optPolicies.isPresent()) { + return FutureUtil.failedFuture( + new ServerMetadataException( + new MetadataStoreException.NotFoundException())); + } - return CompletableFuture.completedFuture(optPolicies.get()); - }); + return CompletableFuture.completedFuture(optPolicies.get().replication_clusters); + }); + } else { + return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters()); + } + }); CompletableFuture ttlFuture = getMessageTTL(); - return CompletableFuture.allOf(policiesFuture, ttlFuture) + return CompletableFuture.allOf(replicationClustersFuture, ttlFuture) .thenCompose(__ -> { - Policies policies = policiesFuture.join(); + Set configuredClusters = replicationClustersFuture.join(); int newMessageTTLinSeconds = ttlFuture.join(); - Set configuredClusters; - if (policies.replication_clusters != null) { - configuredClusters = Sets.newTreeSet(policies.replication_clusters); - } else { - configuredClusters = Collections.emptySet(); - } - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); // if local cluster is removed from global namespace cluster-list : then delete topic forcefully diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 95099c4d931f6..b741a0dff3cc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -25,6 +25,7 @@ import com.google.common.collect.BoundType; import com.google.common.collect.Lists; import com.google.common.collect.Range; +import com.google.common.collect.Sets; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; @@ -306,6 +307,34 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String } } + /** + * It validates that peer-clusters can't coexist in replication-clusters. + * + * @clusterName: given cluster whose peer-clusters can't be present into replication-cluster list + * @replicationClusters: replication-cluster list + */ + protected void validatePeerClusterConflict(String clusterName, Set replicationClusters) { + try { + ClusterData clusterData = clusterResources().getCluster(clusterName).orElseThrow( + () -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName)); + Set peerClusters = clusterData.getPeerClusterNames(); + if (peerClusters != null && !peerClusters.isEmpty()) { + Sets.SetView conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters); + if (!conflictPeerClusters.isEmpty()) { + log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(), + clusterName, conflictPeerClusters); + throw new RestException(Status.CONFLICT, + String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName, + conflictPeerClusters, replicationClusters)); + } + } + } catch (RestException re) { + throw re; + } catch (Exception e) { + log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), clusterName, e); + } + } + protected void validateClusterForTenant(String tenant, String cluster) { TenantInfo tenantInfo; try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index e5f6e5d1a343d..f94297a82dab4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.api.client.util.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.lang.reflect.Field; @@ -2641,4 +2642,23 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() { }); } + @Test(timeOut = 30000) + public void testReplicatorClusterApi() throws Exception { + final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); + // init cache + pulsarClient.newProducer().topic(topic).create().close(); + + assertNull(admin.topics().getReplicationClusters(topic, false)); + + List clusters = Lists.newArrayList(); + clusters.add("test"); + admin.topics().setReplicationClusters(topic, clusters); + Awaitility.await().untilAsserted(() + -> assertEquals(admin.topics().getReplicationClusters(topic, false), clusters)); + + admin.topics().removeReplicationClusters(topic); + Awaitility.await().untilAsserted(() + -> assertNull(admin.topics().getReplicationClusters(topic, false))); + } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0d041df159aad..8c4780e542c22 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -3356,4 +3356,62 @@ CompletableFuture> examineMessageAsync(String topic, String init * @return */ CompletableFuture setReplicatedSubscriptionStatusAsync(String topic, String subName, boolean enabled); + + /** + * Get the replication clusters for a topic. + * + * @param topic + * @param applied + * @return + * @throws PulsarAdminException + */ + Set getReplicationClusters(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get the replication clusters for a topic asynchronously. + * + * @param topic + * @param applied + * @return + * @throws PulsarAdminException + */ + CompletableFuture> getReplicationClustersAsync(String topic, boolean applied); + + /** + * Set the replication clusters for the topic. + * + * @param topic + * @param clusterIds + * @return + * @throws PulsarAdminException + */ + void setReplicationClusters(String topic, List clusterIds) throws PulsarAdminException; + + /** + * Set the replication clusters for the topic asynchronously. + * + * @param topic + * @param clusterIds + * @return + * @throws PulsarAdminException + */ + CompletableFuture setReplicationClustersAsync(String topic, List clusterIds); + + /** + * Remove the replication clusters for the topic. + * + * @param topic + * @return + * @throws PulsarAdminException + */ + void removeReplicationClusters(String topic) throws PulsarAdminException; + + /** + * Remove the replication clusters for the topic asynchronously. + * + * @param topic + * @return + * @throws PulsarAdminException + */ + CompletableFuture removeReplicationClustersAsync(String topic); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 7b518e2a51d22..97e9f69d61950 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -3777,5 +3777,82 @@ public CompletableFuture setReplicatedSubscriptionStatusAsync(String topic return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); } + @Override + public Set getReplicationClusters(String topic, boolean applied) throws PulsarAdminException { + try { + return getReplicationClustersAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture> getReplicationClustersAsync(String topic, boolean applied) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "replication"); + path = path.queryParam("applied", applied); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(Set clusterIds) { + future.complete(clusterIds); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setReplicationClusters(String topic, List clusterIds) throws PulsarAdminException { + try { + setReplicationClustersAsync(topic, clusterIds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setReplicationClustersAsync(String topic, List clusterIds) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "replication"); + return asyncPostRequest(path, Entity.entity(clusterIds, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeReplicationClusters(String topic) throws PulsarAdminException { + try { + removeReplicationClustersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeReplicationClustersAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "replication"); + return asyncDeleteRequest(path); + } + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index abd70ace1ddba..b436f84962b7c 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -226,6 +226,10 @@ public CmdTopics(Supplier admin) { jcommander.addCommand("set-replicated-subscription-status", new SetReplicatedSubscriptionStatus()); + jcommander.addCommand("get-replication-clusters", new GetReplicationClusters()); + jcommander.addCommand("set-replication-clusters", new SetReplicationClusters()); + jcommander.addCommand("remove-replication-clusters", new RemoveReplicationClusters()); + initDeprecatedCommands(); } @@ -1239,6 +1243,50 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get the replication clusters for a topic") + private class GetReplicationClusters extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopics().getReplicationClusters(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Set the replication clusters for a topic") + private class SetReplicationClusters extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Parameter(names = { "--clusters", + "-c" }, description = "Replication Cluster Ids list (comma separated values)", required = true) + private String clusterIds; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + List clusters = Lists.newArrayList(clusterIds.split(",")); + getTopics().setReplicationClusters(persistentTopic, clusters); + } + } + + @Parameters(commandDescription = "Remove the replication clusters for a topic") + private class RemoveReplicationClusters extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopics().removeReplicationClusters(persistentTopic); + } + } + @Parameters(commandDescription = "Get the delayed delivery policy for a topic") private class GetDelayedDelivery extends CliCommand { @Parameter(description = "tenant/namespace/topic", required = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 63d8352799868..cdb83e0ff7630 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -19,16 +19,15 @@ package org.apache.pulsar.common.policies.data; import com.google.common.collect.Maps; - import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; -import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.Setter; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; @@ -41,15 +40,14 @@ @Builder @NoArgsConstructor @AllArgsConstructor -@Getter -@Setter public class TopicPolicies { @Builder.Default private Map backLogQuotaMap = Maps.newHashMap(); @Builder.Default private List subscriptionTypesEnabled = new ArrayList<>(); - + @Builder.Default + private Set replicationClusters = new HashSet<>(); private PersistencePolicies persistence; private RetentionPolicies retentionPolicies; private Boolean deduplicationEnabled; From 7801ed959f926c21fbd7ea8d9771d718bf0462d1 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 24 Sep 2021 15:29:07 +0800 Subject: [PATCH 2/3] Change type of replicationClusters from Set to List. Because ARVO deserialized error. --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++-- .../apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- .../pulsar/broker/service/persistent/PersistentTopic.java | 7 ++++--- .../org/apache/pulsar/broker/admin/TopicPoliciesTest.java | 2 +- .../apache/pulsar/common/policies/data/TopicPolicies.java | 8 ++++++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 8483a44fdc4a6..0260b3dd0ee8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2729,7 +2729,7 @@ protected CompletableFuture internalSetReplicationClusters(List cl return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(replicationClusters); + topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters)); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) .thenRun(() -> { log.info("[{}] Successfully set replication clusters for namespace={}, " @@ -2749,7 +2749,7 @@ protected CompletableFuture internalRemoveReplicationClusters() { return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(Collections.emptySet()); + topicPolicies.setReplicationClusters(Collections.emptyList()); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) .thenRun(() -> { log.info("[{}] Successfully set replication clusters for namespace={}, " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 70e6e2fae4179..f4a0f6f333595 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1669,7 +1669,7 @@ public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse, preValidation(authoritative) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) .thenAccept(op -> { - asyncResponse.resume(op.map(TopicPolicies::getReplicationClusters).orElseGet(() -> { + asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> { if (applied) { return getNamespacePolicies(namespaceName).replication_clusters; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7f68d5c6a944c..971413f9e9419 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1335,7 +1335,7 @@ public CompletableFuture checkReplication() { log.debug("[{}] Checking replication status", name); } - CompletableFuture> replicationClustersFuture = brokerService.pulsar() + CompletableFuture> replicationClustersFuture = brokerService.pulsar() .getTopicPoliciesService() .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor()) .thenCompose(topicPolicies -> { @@ -1351,7 +1351,8 @@ public CompletableFuture checkReplication() { new MetadataStoreException.NotFoundException())); } - return CompletableFuture.completedFuture(optPolicies.get().replication_clusters); + return CompletableFuture.completedFuture( + Lists.newArrayList(optPolicies.get().replication_clusters)); }); } else { return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters()); @@ -1362,7 +1363,7 @@ public CompletableFuture checkReplication() { return CompletableFuture.allOf(replicationClustersFuture, ttlFuture) .thenCompose(__ -> { - Set configuredClusters = replicationClustersFuture.join(); + List configuredClusters = replicationClustersFuture.join(); int newMessageTTLinSeconds = ttlFuture.join(); String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f94297a82dab4..f2fe0b7b6c058 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -2658,7 +2658,7 @@ public void testReplicatorClusterApi() throws Exception { admin.topics().removeReplicationClusters(topic); Awaitility.await().untilAsserted(() - -> assertNull(admin.topics().getReplicationClusters(topic, false))); + -> assertTrue(admin.topics().getReplicationClusters(topic, false).isEmpty())); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index cdb83e0ff7630..952c7c8654806 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -19,8 +19,8 @@ package org.apache.pulsar.common.policies.data; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,7 +47,7 @@ public class TopicPolicies { @Builder.Default private List subscriptionTypesEnabled = new ArrayList<>(); @Builder.Default - private Set replicationClusters = new HashSet<>(); + private List replicationClusters = new ArrayList<>(); private PersistencePolicies persistence; private RetentionPolicies retentionPolicies; private Boolean deduplicationEnabled; @@ -162,4 +162,8 @@ public boolean isPublishRateSet() { public boolean isSubscribeRateSet() { return subscribeRate != null; } + + public Set getReplicationClustersSet() { + return Sets.newTreeSet(this.replicationClusters); + } } From 781708e6367a8bbbcc6e84dc1f9d93f713af76b8 Mon Sep 17 00:00:00 2001 From: technoboy Date: Mon, 27 Sep 2021 16:15:41 +0800 Subject: [PATCH 3/3] Add test --- .../admin/impl/PersistentTopicsBase.java | 3 +- .../service/persistent/PersistentTopic.java | 51 ++++++++++--------- .../broker/admin/TopicPoliciesTest.java | 2 +- .../broker/service/PersistentTopicTest.java | 50 ++++++++++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 9 ++++ .../common/policies/data/TopicPolicies.java | 5 +- 6 files changed, 90 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0260b3dd0ee8c..6736ebeaaf225 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Base64; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -2749,7 +2748,7 @@ protected CompletableFuture internalRemoveReplicationClusters() { return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(Collections.emptyList()); + topicPolicies.setReplicationClusters(null); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) .thenRun(() -> { log.info("[{}] Successfully set replication clusters for namespace={}, " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 971413f9e9419..bc66a1720845c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -72,7 +72,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieId; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1335,29 +1334,7 @@ public CompletableFuture checkReplication() { log.debug("[{}] Checking replication status", name); } - CompletableFuture> replicationClustersFuture = brokerService.pulsar() - .getTopicPoliciesService() - .getTopicPoliciesAsyncWithRetry(name, null, brokerService.pulsar().getExecutor()) - .thenCompose(topicPolicies -> { - if (!topicPolicies.isPresent() - || CollectionUtils.isEmpty(topicPolicies.get().getReplicationClusters())) { - return brokerService.pulsar().getPulsarResources() - .getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenCompose(optPolicies -> { - if (!optPolicies.isPresent()) { - return FutureUtil.failedFuture( - new ServerMetadataException( - new MetadataStoreException.NotFoundException())); - } - - return CompletableFuture.completedFuture( - Lists.newArrayList(optPolicies.get().replication_clusters)); - }); - } else { - return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters()); - } - }); + CompletableFuture> replicationClustersFuture = getReplicationClusters(name); CompletableFuture ttlFuture = getMessageTTL(); @@ -1406,6 +1383,32 @@ public CompletableFuture checkReplication() { }); } + @VisibleForTesting + public CompletableFuture> getReplicationClusters(TopicName topicName) { + return brokerService.pulsar() + .getTopicPoliciesService() + .getTopicPoliciesAsyncWithRetry(topicName, null, brokerService.pulsar().getExecutor()) + .thenCompose(topicPolicies -> { + if (!topicPolicies.isPresent() || topicPolicies.get().getReplicationClusters() == null) { + return brokerService.pulsar().getPulsarResources() + .getNamespaceResources() + .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) + .thenCompose(optPolicies -> { + if (!optPolicies.isPresent()) { + return FutureUtil.failedFuture( + new ServerMetadataException( + new MetadataStoreException.NotFoundException())); + } + + return CompletableFuture.completedFuture( + Lists.newArrayList(optPolicies.get().replication_clusters)); + }); + } else { + return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters()); + } + }); + } + @Override public void checkMessageExpiry() { getMessageTTL().thenAccept(messageTtlInSeconds -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index f2fe0b7b6c058..f94297a82dab4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -2658,7 +2658,7 @@ public void testReplicatorClusterApi() throws Exception { admin.topics().removeReplicationClusters(topic); Awaitility.await().untilAsserted(() - -> assertTrue(admin.topics().getReplicationClusters(topic, false).isEmpty())); + -> assertNull(admin.topics().getReplicationClusters(topic, false))); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index cbffa3f20d565..e39b0abce1570 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -46,9 +46,11 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -117,6 +119,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; @@ -2208,4 +2211,51 @@ private ByteBuf getMessageWithMetadata(byte[] data) { messageData.writeTo(headers); return ByteBufPair.coalesce(ByteBufPair.get(headers, payload)); } + + @Test + public void testGetReplicationClusters() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + TopicName name = TopicName.get(successTopicName); + CompletableFuture> replicationClusters = topic.getReplicationClusters(name); + try { + replicationClusters.get(); + fail("Should have failed"); + } catch (ExecutionException ex) { + assertTrue(ex.getCause() instanceof BrokerServiceException.ServerMetadataException); + } + + PulsarResources pulsarResources = spy(new PulsarResources(store, store)); + NamespaceResources nsr = spy(new NamespaceResources(store, store, 30)); + doReturn(nsr).when(pulsarResources).getNamespaceResources(); + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + CompletableFuture> policiesFuture = new CompletableFuture<>(); + Policies policies = new Policies(); + Set namespaceClusters = new HashSet<>(); + namespaceClusters.add("namespace-cluster"); + policies.replication_clusters = namespaceClusters; + Optional optionalPolicies = Optional.of(policies); + policiesFuture.complete(optionalPolicies); + doReturn(policiesFuture).when(nsr).getPoliciesAsync(any()); + + topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + replicationClusters = topic.getReplicationClusters(name); + + assertEquals(replicationClusters.get(), namespaceClusters); + + TopicPoliciesService topicPoliciesService = mock(TopicPoliciesService.class); + doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService(); + CompletableFuture> topicPoliciesFuture = new CompletableFuture<>(); + TopicPolicies topicPolicies = new TopicPolicies(); + List topicClusters = new ArrayList<>(); + topicClusters.add("topic-cluster"); + topicPolicies.setReplicationClusters(topicClusters); + Optional optionalTopicPolicies = Optional.of(topicPolicies); + topicPoliciesFuture.complete(optionalTopicPolicies); + when(topicPoliciesService.getTopicPoliciesAsyncWithRetry(any(), any(), any())).thenReturn(topicPoliciesFuture); + + topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + replicationClusters = topic.getReplicationClusters(name); + + assertEquals(replicationClusters.get(), topicClusters); + } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a41f6568ed44d..2407f337291ba 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1289,6 +1289,15 @@ public boolean matches(Long timestamp) { cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getMaxConsumers("persistent://myprop/clust/ns1/ds1", true); + + cmdTopics.run(split("get-replication-clusters persistent://myprop/clust/ns1/ds1 --applied")); + verify(mockTopics).getReplicationClusters("persistent://myprop/clust/ns1/ds1", true); + + cmdTopics.run(split("set-replication-clusters persistent://myprop/clust/ns1/ds1 -c test")); + verify(mockTopics).setReplicationClusters("persistent://myprop/clust/ns1/ds1", Lists.newArrayList("test")); + + cmdTopics.run(split("remove-replication-clusters persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeReplicationClusters("persistent://myprop/clust/ns1/ds1"); } private static LedgerInfo newLedger(long id, long entries, long size) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 952c7c8654806..4d7c779810195 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -46,8 +46,7 @@ public class TopicPolicies { private Map backLogQuotaMap = Maps.newHashMap(); @Builder.Default private List subscriptionTypesEnabled = new ArrayList<>(); - @Builder.Default - private List replicationClusters = new ArrayList<>(); + private List replicationClusters; private PersistencePolicies persistence; private RetentionPolicies retentionPolicies; private Boolean deduplicationEnabled; @@ -164,6 +163,6 @@ public boolean isSubscribeRateSet() { } public Set getReplicationClustersSet() { - return Sets.newTreeSet(this.replicationClusters); + return replicationClusters != null ? Sets.newTreeSet(this.replicationClusters) : null; } }