Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParseJsonProcessor initial implementation #1688

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

repositories {
mavenCentral()
}

dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
}

test {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend setting a minimum test coverage to maintain our high coding standards. Here is an example for another plugin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the recommendation!

I set the threshold to 96% but it's slightly higher in reality because there aren't any unit tests for the prepareForShutdown, isReadyForShutdown, and shutdown methods. These methods are needed to extend AbstractProcessor but they're empty because parse_json doesn't have any special behavior for shutdown. The implemented line and branch coverage are 100%.

useJUnitPlatform()
}

jacocoTestCoverageVerification {
dependsOn jacocoTestReport
violationRules {
rule {
limit {
minimum = 0.96
}
}
}
}

check.dependsOn jacocoTestCoverageVerification
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.parsejson;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class)
public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class);

private final String source;
private final String destination;
@DataPrepperPluginConstructor
public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig) {
super(pluginMetrics);

source = parseJsonProcessorConfig.getSource();
destination = parseJsonProcessorConfig.getDestination();
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper objectMapper = new ObjectMapper();
final boolean doWriteToRoot = Objects.isNull(destination);

for (final Record<Event> record : records) {
final Event event = record.getData();
final String message = event.get(source, String.class);
try {
final TypeReference<HashMap<String, Object>> hashMapTypeReference = new TypeReference<HashMap<String, Object>>() {};
final Map<String, Object> parsedJson = objectMapper.readValue(message, hashMapTypeReference);

if (doWriteToRoot) {
writeToRoot(event, parsedJson);
} else {
event.put(destination, parsedJson);
}
} catch (final JsonProcessingException jsonException) {
LOG.error("An exception occurred due to invalid JSON while reading event [{}]", event, jsonException);
}
}
return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return true;
}

@Override
public void shutdown() {

}

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
event.put(entry.getKey(), entry.getValue());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.parsejson;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotBlank;

public class ParseJsonProcessorConfig {
static final String DEFAULT_SOURCE = "message";

@NotBlank
@JsonProperty("source")
private String source = DEFAULT_SOURCE;

@JsonProperty("destination")
private String destination;

/**
* The field of the Event that contains the JSON data.
*
* @return The name of the source field.
*/
public String getSource() {
return source;
}

/**
* The destination that the parsed JSON is written to. Defaults to the root of the Event.
* If the destination field already exists, it will be overwritten.
*
* @return The name of the destination field.
*/
public String getDestination() {
Copy link
Contributor Author

@finnroblin finnroblin Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better for the default destination (Event root) to be / instead of null. Then we avoid some null values and my opinion is that / is more declarative. What do other people think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used / as the root what would the expected behavior behavior be if someone had multiple /s in the destination? One might expect it to create a nested field similar to directory syntax .

ex: Would destination: /foo/bar become foo.bar: <parsed_json> where bar is a nested field?

That might be useful, but I don't think that's what we're going for here so maybe / would be misleading. I do think your point about a more declarative default is valid though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the behavior if someone had multiple / in the destination (/foo/bar) is that the parsed json will be written to the most nested field in the destination. So if the json is {"key":"value"} and the destination is (/foo/bar), event.get("/foo/bar", Map.class) == {"key":"value"} and event.get("/foo/bar/key", String.class) == "value". Right now the parse_json processor doesn't flatten nested fields (so /foo/bar is not flattened to foo.bar), it just puts any nested json into the event without changing it.

I do see how / is misleading. It's also misleading for another reason: calling event.put("/", value) will not put value into the event root because the JacksonEvent implementation trims leading slashes. So it would not write value to the event at all.

I think we can stick with a null default and be more declarative by mentioning that destination is null -> write to root in documentation.

return destination;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.parsejson;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.record.Record;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.util.Map.entry;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ParseJsonProcessorTest {
private static final String DEEPLY_NESTED_KEY_NAME = "base";

@Mock
private ParseJsonProcessorConfig processorConfig;

@Mock
private PluginMetrics pluginMetrics;

private ParseJsonProcessor parseJsonProcessor;

@BeforeEach
void setup() {
ParseJsonProcessorConfig defaultConfig = new ParseJsonProcessorConfig();
when(processorConfig.getSource()).thenReturn(defaultConfig.getSource());
when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination());

parseJsonProcessor = createObjectUnderTest();
}

private ParseJsonProcessor createObjectUnderTest() {
return new ParseJsonProcessor(pluginMetrics, processorConfig);
}

@Test
void test_when_differentSourceAndDestination_then_processorParsesCorrectly() {
final String source = "different_source";
final String destination = "destination_key";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getDestination()).thenReturn(destination);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Collections.singletonMap("key", "value");
final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThat(parsedEvent.containsKey(source), equalTo(true));
assertThat(parsedEvent.containsKey(destination), equalTo(true));

assertThatFirstMapIsSubsetOfSecondMap(data, parsedEvent.get(destination, Map.class));
}

@Test
void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() {
final String source = "root_source";
when(processorConfig.getSource()).thenReturn(source);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final Map<String, Object> data = Map.ofEntries(
entry(source,"value_that_will_overwrite_source"),
entry("key","value")
);

final String serializedMessage = convertMapToJSONString(data);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, source, "value_that_will_overwrite_source");
assertThatKeyEquals(parsedEvent, "key", "value");
}

@Test
void test_when_valueIsEmpty_then_notParsed() {
final Map<String, Object> emptyData = Collections.singletonMap("key",""); // invalid JSON

final String serializedMessage = convertMapToJSONString(emptyData);
final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, processorConfig.getSource(), serializedMessage);
assertThat(parsedEvent.toMap().size(), equalTo(1));
}

@Test
void test_when_deeplyNestedFieldInRoot_then_canReachDeepestLayer() {
final int numberOfLayers = 200;
final Map<String, Object> messageMap = constructArbitrarilyDeepJsonMap(numberOfLayers);
final String serializedMessage = convertMapToJSONString(messageMap);

final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

assertThatKeyEquals(parsedEvent, DEEPLY_NESTED_KEY_NAME, messageMap.get(DEEPLY_NESTED_KEY_NAME));
final String jsonPointerToValue = constructDeeplyNestedJsonPointer(numberOfLayers);
assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value"));
}

@Test
void test_when_deeplyNestedFieldInKey_then_canReachDeepestLayer() {
final String destination = "destination_key";
when(processorConfig.getDestination()).thenReturn(destination);
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used

final int numberOfLayers = 20;
final Map<String, Object> messageMap = constructArbitrarilyDeepJsonMap(numberOfLayers);
final String serializedMessage = convertMapToJSONString(messageMap);

final Event parsedEvent = createAndParseMessageEvent(serializedMessage);

final String completeDeeplyNestedKeyName = destination + "/" + DEEPLY_NESTED_KEY_NAME;
assertThatKeyEquals(parsedEvent, completeDeeplyNestedKeyName, messageMap.get(DEEPLY_NESTED_KEY_NAME));
final String jsonPointerToValue = destination + constructDeeplyNestedJsonPointer(numberOfLayers);

assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value"));
}

private String constructDeeplyNestedJsonPointer(final int numberOfLayers) {
String pointer = "/" + DEEPLY_NESTED_KEY_NAME;
for (int layer = 0; layer < numberOfLayers; layer++) {
pointer += "/key" + layer;
}
return pointer;
}

/**
* Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation).
* @param messageMap
* @return
*/
private String convertMapToJSONString(final Map<String, Object> messageMap) {
final String replaceEquals = messageMap.toString().replace("=",":");
final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes
return addQuotes;
}

/**
* Creates a Map that maps a single key to a value nested numberOfLayers layers deep.
* @param numberOfLayers
* @return
*/
private Map<String, Object> constructArbitrarilyDeepJsonMap(final int numberOfLayers) {
final Map<String, Object> result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers));
return result;
}

private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) {
if (currentLayer >= numberOfLayers) return "value";

final String key = "key" + currentLayer;
return Collections.singletonMap(key, deepJsonMapHelper(currentLayer+1, numberOfLayers));
}

private Event createAndParseMessageEvent(final String message) {
final Record<Event> eventUnderTest = createMessageEvent(message);
final List<Record<Event>> editedEvents = (List<Record<Event>>) parseJsonProcessor.doExecute(
Collections.singletonList(eventUnderTest));
return editedEvents.get(0).getData();
}


private Record<Event> createMessageEvent(final String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put(processorConfig.getSource(), message);
return buildRecordWithEvent(eventData);
}

private Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build());
}

private void assertThatKeyEquals(final Event parsedEvent, final String key, final Object value) {
assertThat(parsedEvent.containsKey(key), equalTo(true));
assertThat(parsedEvent.get(key, Object.class), equalTo(value));
}

private void assertThatFirstMapIsSubsetOfSecondMap(final Map<String, Object> subset, final Map<String, Object> secondMap) {
assertThat(Objects.nonNull(subset), equalTo(true));
assertThat(Objects.nonNull(secondMap), equalTo(true));

assertThat((subset.size() <= secondMap.size()), equalTo(true));

for (Map.Entry<String, Object> entry : subset.entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
assertThat(secondMap.containsKey(key), equalTo(true));
assertThat(secondMap.get(key), equalTo(value));
}
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ include 'data-prepper-expression'
include 'data-prepper-plugins:mutate-string-processors'
include 'data-prepper-plugins:s3-source'
include 'data-prepper-plugins:csv-processor'
include 'data-prepper-plugins:parse-json-processor'