Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for setting geo-replication clusters on topic level #12136

Merged
merged 5 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -1961,34 +1960,6 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr
}
}

/**
* It validates that peer-clusters can't coexist in replication-clusters.
*
* @param clusterName: given cluster whose peer-clusters can't be present into replication-cluster list
* @param replicationClusters: replication-cluster list
*/
private void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
try {
ClusterData clusterData = clusterResources().getCluster(clusterName).orElseThrow(
() -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
Set<String> peerClusters = clusterData.getPeerClusterNames();
if (peerClusters != null && !peerClusters.isEmpty()) {
SetView<String> conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters);
if (!conflictPeerClusters.isEmpty()) {
log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(),
clusterName, conflictPeerClusters);
throw new RestException(Status.CONFLICT,
String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName,
conflictPeerClusters, replicationClusters));
}
}
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), clusterName, e);
}
}

protected BundlesData validateBundlesData(BundlesData initialBundles) {
SortedSet<String> partitions = new TreeSet<String>();
for (String partition : initialBundles.getBoundaries()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2816,6 +2816,60 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
});
}

protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of replication clusters");
}
Set<String> clusters = clusters();
for (String clusterId : replicationClusters) {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN, "Invalid cluster id: " + clusterId);
}
validatePeerClusterConflict(clusterId, replicationClusters);
validateClusterForTenant(namespaceName.getTenant(), clusterId);
}

return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set replication clusters for namespace={}, "
+ "topic={}, clusters={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
topicPolicies.getReplicationClusters());
});
}
);
}

protected CompletableFuture<Void> internalRemoveReplicationClusters() {
validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicationClusters(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.thenRun(() -> {
log.info("[{}] Successfully set replication clusters for namespace={}, "
+ "topic={}, clusters={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
topicPolicies.getReplicationClusters());
});
}
);
}

protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
return getTopicPoliciesAsyncWithRetry(topicName)
.thenApply(op -> op.map(TopicPolicies::getDeduplicationEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,87 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/replication")
@ApiOperation(value = "Get the replication clusters for a topic")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message =
"Topic level policy is disabled, enable the topic level policy and retry")})
public void getReplicationClusters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
.thenAccept(op -> {
asyncResponse.resume(op.map(TopicPolicies::getReplicationClustersSet).orElseGet(() -> {
if (applied) {
return getNamespacePolicies(namespaceName).replication_clusters;
}
return null;
}));
})
.exceptionally(ex -> {
handleTopicPolicyException("getReplicationClusters", ex, asyncResponse);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/replication")
@ApiOperation(value = "Set the replication clusters for a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Topic is not global or invalid cluster ids")})
public void setReplicationClusters(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "List of replication clusters", required = true) List<String> clusterIds) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalSetReplicationClusters(clusterIds))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setReplicationClusters", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/replication")
@ApiOperation(value = "Remove the replication clusters from a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeReplicationClusters(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
.thenCompose(__ -> internalRemoveReplicationClusters())
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("removeReplicationClusters", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Get message TTL in seconds for a topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,33 +1341,15 @@ public CompletableFuture<Void> checkReplication() {
log.debug("[{}] Checking replication status", name);
}

CompletableFuture<Policies> policiesFuture = brokerService.pulsar().getPulsarResources()
.getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return FutureUtil.failedFuture(
new ServerMetadataException(
new MetadataStoreException.NotFoundException()));
}

return CompletableFuture.completedFuture(optPolicies.get());
});
CompletableFuture<List<String>> replicationClustersFuture = getReplicationClusters(name);

CompletableFuture<Integer> ttlFuture = getMessageTTL();

return CompletableFuture.allOf(policiesFuture, ttlFuture)
return CompletableFuture.allOf(replicationClustersFuture, ttlFuture)
.thenCompose(__ -> {
Policies policies = policiesFuture.join();
List<String> configuredClusters = replicationClustersFuture.join();
int newMessageTTLinSeconds = ttlFuture.join();

Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = Sets.newTreeSet(policies.replication_clusters);
} else {
configuredClusters = Collections.emptySet();
}

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
Expand Down Expand Up @@ -1408,6 +1390,32 @@ public CompletableFuture<Void> checkReplication() {
});
}

@VisibleForTesting
public CompletableFuture<List<String>> getReplicationClusters(TopicName topicName) {
return brokerService.pulsar()
.getTopicPoliciesService()
.getTopicPoliciesAsyncWithRetry(topicName, null, brokerService.pulsar().getExecutor())
.thenCompose(topicPolicies -> {
if (!topicPolicies.isPresent() || topicPolicies.get().getReplicationClusters() == null) {
return brokerService.pulsar().getPulsarResources()
.getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return FutureUtil.failedFuture(
new ServerMetadataException(
new MetadataStoreException.NotFoundException()));
}

return CompletableFuture.completedFuture(
Lists.newArrayList(optPolicies.get().replication_clusters));
});
} else {
return CompletableFuture.completedFuture(topicPolicies.get().getReplicationClusters());
}
});
}

@Override
public void checkMessageExpiry() {
getMessageTTL().thenAccept(messageTtlInSeconds -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -306,6 +307,34 @@ protected static void validateAdminAccessForTenant(PulsarService pulsar, String
}
}

/**
* It validates that peer-clusters can't coexist in replication-clusters.
*
* @clusterName: given cluster whose peer-clusters can't be present into replication-cluster list
* @replicationClusters: replication-cluster list
*/
protected void validatePeerClusterConflict(String clusterName, Set<String> replicationClusters) {
try {
ClusterData clusterData = clusterResources().getCluster(clusterName).orElseThrow(
() -> new RestException(Status.PRECONDITION_FAILED, "Invalid replication cluster " + clusterName));
Set<String> peerClusters = clusterData.getPeerClusterNames();
if (peerClusters != null && !peerClusters.isEmpty()) {
Sets.SetView<String> conflictPeerClusters = Sets.intersection(peerClusters, replicationClusters);
if (!conflictPeerClusters.isEmpty()) {
log.warn("[{}] {}'s peer cluster can't be part of replication clusters {}", clientAppId(),
clusterName, conflictPeerClusters);
throw new RestException(Status.CONFLICT,
String.format("%s's peer-clusters %s can't be part of replication-clusters %s", clusterName,
conflictPeerClusters, replicationClusters));
}
}
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.warn("[{}] Failed to get cluster-data for {}", clientAppId(), clusterName, e);
}
}

protected void validateClusterForTenant(String tenant, String cluster) {
TenantInfo tenantInfo;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -2643,6 +2644,25 @@ public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
});
}

@Test(timeOut = 30000)
public void testReplicatorClusterApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();

assertNull(admin.topics().getReplicationClusters(topic, false));

List<String> clusters = Lists.newArrayList();
clusters.add("test");
admin.topics().setReplicationClusters(topic, clusters);
Awaitility.await().untilAsserted(()
-> assertEquals(admin.topics().getReplicationClusters(topic, false), clusters));

admin.topics().removeReplicationClusters(topic);
Awaitility.await().untilAsserted(()
-> assertNull(admin.topics().getReplicationClusters(topic, false)));
}

@Test
public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down
Loading