Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix read schema compatibility strategy priority #13938

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1290,12 +1290,10 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
# if you enable this setting, it will cause non-java clients failed to produce.
isSchemaValidationEnforced=false

# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`,
# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`.
# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# The schema compatibility strategy in broker level.
# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
# FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
# default : UNDEFINED
schemaCompatibilityStrategy=
schemaCompatibilityStrategy=FULL

### --- Ledger Offloading --- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2195,10 +2195,9 @@ public class ServiceConfiguration implements PulsarConfiguration {

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`"
+ ", schema compatibility strategy check will use it in broker level."
doc = "The schema compatibility strategy in broker level"
)
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED;
private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;

/**** --- WebSocket. --- ****/
@FieldContext(
Expand Down Expand Up @@ -2610,4 +2609,10 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
}
}

public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return SchemaCompatibilityStrategy.FULL;
}
return schemaCompatibilityStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicPolicies;
Expand Down Expand Up @@ -764,6 +765,20 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr
}
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}
return schemaCompatibilityStrategy;
});
}

@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return com.google.common.base.Preconditions.checkNotNull(reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2342,15 +2342,8 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
}
return schemaCompatibilityStrategy;

return policies.schema_compatibility_strategy;
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
Expand Down Expand Up @@ -136,16 +134,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response) {
public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) {
validateDestinationAndAdminOperation(authoritative);

getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy =
pulsar().getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
}
getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
try {
Expand Down Expand Up @@ -199,26 +188,17 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative,
validateDestinationAndAdminOperation(authoritative);

String schemaId = getSchemaId();
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy;
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}

pulsar().getSchemaRegistryService()
.isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build()))
getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar()
.getSchemaRegistryService().isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
.user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(),
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.build())))
.exceptionally(error -> {
response.resume(new RestException(error));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,17 +651,19 @@ protected void setSchemaCompatibilityStrategy(Policies policies) {
if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
schemaCompatibilityStrategy =
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
} else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
return;
}

schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy();
}
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
}

private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -42,9 +40,6 @@
@Slf4j
@Test(groups = "broker-admin")
public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -67,8 +62,8 @@ public void cleanup() throws Exception {
}

private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Backward);

Expand All @@ -91,8 +86,8 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
}

private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Forward);

Expand All @@ -114,8 +109,7 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex
}

private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));

try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
p.send(new V1Data("test1", 1));
Expand All @@ -142,8 +136,8 @@ private void testAutoUpdateFull(String namespace, String topicName) throws Excep
}

private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
Expand All @@ -44,9 +45,11 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -60,6 +63,8 @@
public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {

final String cluster = "test";
private final String schemaCompatibilityNamespace = "schematest/test-schema-compatibility-ns";

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -71,6 +76,7 @@ public void setup() throws Exception {
admin.tenants().createTenant("schematest", tenantInfo);
admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test"));
admin.namespaces().createNamespace("schematest/"+cluster+"/test", Sets.newHashSet("test"));
admin.namespaces().createNamespace(schemaCompatibilityNamespace, Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -348,4 +354,51 @@ public long getCToken() {
assertEquals(ledgerInfo.entries, entryId + 1);
assertEquals(ledgerInfo.size, length);
}

@Test
public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException {
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED);
}

@Test
public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException {
assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace));
}

@Test
public void testGetSchemaCompatibilityStrategyWhenSetSchemaAutoUpdateCompatibilityStrategy()
throws PulsarAdminException {
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaAutoUpdateCompatibilityStrategy.Forward);
Awaitility.await().untilAsserted(() -> assertEquals(SchemaAutoUpdateCompatibilityStrategy.Forward,
admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace)
));

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaCompatibilityStrategy.BACKWARD);
Awaitility.await().untilAsserted(() -> assertEquals(SchemaCompatibilityStrategy.BACKWARD,
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace)));
}

@Test
public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpdateCompatibilityStrategy()
throws PulsarAdminException {
pulsar.getConfiguration().setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.FORWARD);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace,
SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible);
Awaitility.await().untilAsserted(() -> assertEquals(
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
SchemaCompatibilityStrategy.UNDEFINED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
Expand Down Expand Up @@ -320,7 +320,7 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
Expand Down Expand Up @@ -399,7 +399,7 @@ public void testSchemaComparison() throws Exception {
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);
SchemaCompatibilityStrategy.UNDEFINED);
byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
.getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes();
SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
Expand Down
Loading