Skip to content

Commit

Permalink
[feat][connector] ElasticSearch Sink: add a option to copy key fields…
Browse files Browse the repository at this point in the history
… into the value (apache#17117)
  • Loading branch information
nicoloboschi authored and nodece committed Sep 8, 2022
1 parent f55d4f5 commit 59194b8
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ public class ElasticSearchConfig implements Serializable {
)
private IdHashingAlgorithm idHashingAlgorithm = IdHashingAlgorithm.NONE;

@FieldDoc(
defaultValue = "false",
help = "When the message key schema is AVRO or JSON, copy the message key fields into the Elasticsearch _source."
)
private boolean copyKeyFields = false;

public enum MalformedDocAction {
IGNORE,
WARN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,15 @@ public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) t
String doc = null;
if (value != null) {
if (valueSchema != null) {
doc = stringifyValue(valueSchema, value);
if (elasticSearchConfig.isCopyKeyFields()
&& (keySchema.getSchemaInfo().getType().equals(SchemaType.AVRO)
|| keySchema.getSchemaInfo().getType().equals(SchemaType.JSON))) {
JsonNode keyNode = extractJsonNode(keySchema, key);
JsonNode valueNode = extractJsonNode(valueSchema, value);
doc = stringify(JsonConverter.topLevelMerge(keyNode, valueNode));
} else {
doc = stringifyValue(valueSchema, value);
}
} else {
if (value.getNativeObject() instanceof byte[]) {
// for BWC with the ES-Sink
Expand Down Expand Up @@ -330,6 +338,10 @@ public String stringifyKey(JsonNode jsonNode, List<String> fields) throws JsonPr

public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
JsonNode jsonNode = extractJsonNode(schema, val);
return stringify(jsonNode);
}

public String stringify(JsonNode jsonNode) throws JsonProcessingException {
return elasticSearchConfig.isStripNulls()
? objectMapper.writeValueAsString(stripNullNodes(jsonNode))
: objectMapper.writeValueAsString(jsonNode);
Expand All @@ -349,6 +361,9 @@ public static JsonNode stripNullNodes(JsonNode node) {
}

public static JsonNode extractJsonNode(Schema<?> schema, Object val) {
if (val == null) {
return null;
}
switch (schema.getSchemaInfo().getType()) {
case JSON:
return (JsonNode) ((GenericRecord) val).getNativeObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class JsonConverter {
private static Map<String, LogicalTypeConverter<?>> logicalTypeConverters = new HashMap<>();
private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);

public static JsonNode topLevelMerge(JsonNode n1, JsonNode n2) {
ObjectNode objectNode = jsonNodeFactory.objectNode();
n1.fieldNames().forEachRemaining(f -> objectNode.put(f, n1.get(f)));
n2.fieldNames().forEachRemaining(f -> objectNode.put(f, n2.get(f)));
return objectNode;
}

public static JsonNode toJson(GenericRecord genericRecord) {
if (genericRecord == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public final void defaultValueTest() throws IOException {
assertEquals(config.getSocketTimeoutInMs(), 60000);

assertEquals(config.isStripNulls(), true);
assertEquals(config.isCopyKeyFields(), false);
assertEquals(config.isSchemaEnable(), false);
assertEquals(config.isKeyIgnore(), true);
assertEquals(config.getMalformedDocAction(), ElasticSearchConfig.MalformedDocAction.FAIL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,23 +226,44 @@ public GenericObject getValue() {
assertEquals(pair.getLeft(), "[\"1\",1]");
assertEquals(pair.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");

ElasticSearchSink elasticSearchSink2 = new ElasticSearchSink();
elasticSearchSink2.open(ImmutableMap.of(
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
"elasticSearchUrl", "http://localhost:9200",
"compatibilityMode", "ELASTICSEARCH",
"schemaEnable", "true",
"keyIgnore", "false",
"copyKeyFields", "true"), null);
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getLeft(), "[\"1\",1]");
assertEquals(pair.getRight(), "{\"a\":\"1\",\"b\":1,\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");

elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
"elasticSearchUrl", "http://localhost:9200",
"compatibilityMode", "ELASTICSEARCH",
"schemaEnable", "true"), null);
Pair<String, String> pair2 = elasticSearchSink2.extractIdAndDocument(genericObjectRecord);
assertNull(pair2.getLeft());
assertEquals(pair2.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");

pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertNull(pair.getLeft());
assertEquals(pair.getRight(), "{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");

elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of("elasticSearchUrl", "http://localhost:9200",
"schemaEnable", "true",
"compatibilityMode", "ELASTICSEARCH",
"copyKeyFields", "true"), null);
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertNull(pair.getLeft());
assertEquals(pair.getRight(), "{\"a\":\"1\",\"b\":1,\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");

// test null value
ElasticSearchSink elasticSearchSink3 = new ElasticSearchSink();
elasticSearchSink3.open(ImmutableMap.of(
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
"elasticSearchUrl", "http://localhost:9200",
"compatibilityMode", "ELASTICSEARCH",
"schemaEnable", "true",
"keyIgnore", "false"), null);
Pair<String, String> pair3 = elasticSearchSink.extractIdAndDocument(new Record<GenericObject>() {
pair = elasticSearchSink.extractIdAndDocument(new Record<GenericObject>() {
@Override
public Optional<String> getTopicName() {
return Optional.of("data-ks1.table1");
Expand All @@ -268,8 +289,8 @@ public Object getNativeObject() {
};
}
});
assertEquals(pair3.getLeft(), "[\"1\",1]");
assertNull(pair3.getRight());
assertEquals(pair.getLeft(), "[\"1\",1]");
assertNull(pair.getRight());
}

@Test(dataProvider = "schemaType")
Expand Down
1 change: 1 addition & 0 deletions site2/docs/io-elasticsearch-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The configuration of the Elasticsearch sink connector has the following properti
| `canonicalKeyFields` | Boolean | false | false | Whether to sort the key fields for JSON and Avro or not. If it is set to `true` and the record key schema is `JSON` or `AVRO`, the serialized object does not consider the order of properties. |
| `stripNonPrintableCharacters` | Boolean| false | true| Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
| `idHashingAlgorithm` | enum(NONE,SHA256,SHA512)| false | NONE|Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes. |
| `copyKeyFields` | Boolean | false | false |If the message key schema is AVRO or JSON, the message key fields are copied into the ElasticSearch document. |

### Definition of ElasticSearchSslConfig structure:

Expand Down

0 comments on commit 59194b8

Please sign in to comment.