diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 0692c48f151a0..3fb6b58c41144 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -370,10 +370,12 @@ static Schema> KeyValue(Class key, Class value) { } /** - * Key Value Schema using passed in key and value schemas. + * Key Value Schema using passed in key and value schemas with {@link KeyValueEncodingType#INLINE} encoding type. + * + * @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType) */ static Schema> KeyValue(Schema key, Schema value) { - return DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value); + return KeyValue(key, value, KeyValueEncodingType.INLINE); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index da85d3645be64..e7c00ac212879 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -131,8 +131,6 @@ Schema newProtobufNativeSc Schema> newKeyValueBytesSchema(); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema); - Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index eb555112c2848..30c4284d43c5a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -242,10 +242,6 @@ public Schema> newKeyValueBytesSchema() { return KeyValueSchemaImpl.kvBytes(); } - public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema) { - return KeyValueSchemaImpl.of(keySchema, valueSchema); - } - public Schema> newKeyValueSchema(Schema keySchema, Schema valueSchema, KeyValueEncodingType keyValueEncodingType) { return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 320914396910b..f41ff4bc7786c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; @@ -32,7 +33,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.schema.KeyValueEncodingType; @@ -105,14 +106,12 @@ public CompletableFuture sendAsync() { @Override public TypedMessageBuilder key(String key) { - if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - checkArgument(kvSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, - "This method is not allowed to set keys when in encoding type is SEPARATED"); - if (key == null) { - msgMetadata.setNullPartitionKey(true); - return this; - } + getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( + keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, + "This method is not allowed to set keys when in encoding type is SEPARATED")); + if (key == null) { + msgMetadata.setNullPartitionKey(true); + return this; } msgMetadata.setPartitionKey(key); msgMetadata.setPartitionKeyB64Encoded(false); @@ -121,14 +120,12 @@ public TypedMessageBuilder key(String key) { @Override public TypedMessageBuilder keyBytes(byte[] key) { - if (schema instanceof KeyValueSchemaImpl && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED), - "This method is not allowed to set keys when in encoding type is SEPARATED"); - if (key == null) { - msgMetadata.setNullPartitionKey(true); - return this; - } + getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument( + keyValueSchema.getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED, + "This method is not allowed to set keys when in encoding type is SEPARATED")); + if (key == null) { + msgMetadata.setNullPartitionKey(true); + return this; } msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key)); msgMetadata.setPartitionKeyB64Encoded(true); @@ -147,31 +144,18 @@ public TypedMessageBuilder value(T value) { msgMetadata.setNullValue(true); return this; } - if (value instanceof org.apache.pulsar.common.schema.KeyValue - && schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { - KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema; - org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value; - if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - // set key as the message key - if (kv.getKey() != null) { - msgMetadata.setPartitionKey( - Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey()))); - msgMetadata.setPartitionKeyB64Encoded(true); - } else { - this.msgMetadata.setNullPartitionKey(true); - } - - // set value as the payload - if (kv.getValue() != null) { - this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue())); - } else { - this.msgMetadata.setNullValue(true); - } + + return getKeyValueSchema().map(keyValueSchema -> { + if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + setSeparateKeyValue(value, keyValueSchema); return this; + } else { + return null; } - } - this.content = ByteBuffer.wrap(schema.encode(value)); - return this; + }).orElseGet(() -> { + content = ByteBuffer.wrap(schema.encode(value)); + return this; + }); } @Override @@ -300,4 +284,38 @@ public String getKey() { public ByteBuffer getContent() { return content; } + + private Optional> getKeyValueSchema() { + if (schema.getSchemaInfo() != null + && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE + // The schema's class could also be AutoProduceBytesSchema when its type is KEY_VALUE + && schema instanceof KeyValueSchema) { + return Optional.of((KeyValueSchema) schema); + } else { + return Optional.empty(); + } + } + + @SuppressWarnings("unchecked") + private void setSeparateKeyValue(T value, KeyValueSchema keyValueSchema) { + checkArgument(value instanceof org.apache.pulsar.common.schema.KeyValue); + org.apache.pulsar.common.schema.KeyValue keyValue = + (org.apache.pulsar.common.schema.KeyValue) value; + + // set key as the message key + if (keyValue.getKey() != null) { + msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString( + keyValueSchema.getKeySchema().encode(keyValue.getKey()))); + msgMetadata.setPartitionKeyB64Encoded(true); + } else { + msgMetadata.setNullPartitionKey(true); + } + + // set value as the payload + if (keyValue.getValue() != null) { + content = ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue())); + } else { + msgMetadata.setNullValue(true); + } + } }