-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ParseJsonProcessor initial implementation #1688
Changes from all commits
cd8713a
e4ecae2
949011c
68e707e
263f4f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
} | ||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-plugins:common') | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
} | ||
|
||
test { | ||
useJUnitPlatform() | ||
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { | ||
limit { | ||
minimum = 0.96 | ||
} | ||
} | ||
} | ||
} | ||
|
||
check.dependsOn jacocoTestCoverageVerification |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.processor.parsejson; | ||
|
||
import com.amazon.dataprepper.metrics.PluginMetrics; | ||
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; | ||
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import com.amazon.dataprepper.model.event.Event; | ||
import com.amazon.dataprepper.model.processor.AbstractProcessor; | ||
import com.amazon.dataprepper.model.processor.Processor; | ||
import com.amazon.dataprepper.model.record.Record; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@DataPrepperPlugin(name = "parse_json", pluginType = Processor.class, pluginConfigurationType = ParseJsonProcessorConfig.class) | ||
public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(ParseJsonProcessor.class); | ||
|
||
private final String source; | ||
private final String destination; | ||
@DataPrepperPluginConstructor | ||
public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig) { | ||
super(pluginMetrics); | ||
|
||
source = parseJsonProcessorConfig.getSource(); | ||
destination = parseJsonProcessorConfig.getDestination(); | ||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) { | ||
final ObjectMapper objectMapper = new ObjectMapper(); | ||
final boolean doWriteToRoot = Objects.isNull(destination); | ||
|
||
for (final Record<Event> record : records) { | ||
final Event event = record.getData(); | ||
final String message = event.get(source, String.class); | ||
try { | ||
final TypeReference<HashMap<String, Object>> hashMapTypeReference = new TypeReference<HashMap<String, Object>>() {}; | ||
final Map<String, Object> parsedJson = objectMapper.readValue(message, hashMapTypeReference); | ||
|
||
if (doWriteToRoot) { | ||
writeToRoot(event, parsedJson); | ||
} else { | ||
event.put(destination, parsedJson); | ||
} | ||
} catch (final JsonProcessingException jsonException) { | ||
LOG.error("An exception occurred due to invalid JSON while reading event [{}]", event, jsonException); | ||
} | ||
} | ||
return records; | ||
} | ||
|
||
@Override | ||
public void prepareForShutdown() { | ||
|
||
} | ||
|
||
@Override | ||
public boolean isReadyForShutdown() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
|
||
} | ||
|
||
private void writeToRoot(final Event event, final Map<String, Object> parsedJson) { | ||
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) { | ||
event.put(entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.processor.parsejson; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import jakarta.validation.constraints.NotBlank; | ||
|
||
public class ParseJsonProcessorConfig { | ||
static final String DEFAULT_SOURCE = "message"; | ||
|
||
@NotBlank | ||
@JsonProperty("source") | ||
private String source = DEFAULT_SOURCE; | ||
|
||
@JsonProperty("destination") | ||
private String destination; | ||
|
||
/** | ||
* The field of the Event that contains the JSON data. | ||
* | ||
* @return The name of the source field. | ||
*/ | ||
public String getSource() { | ||
return source; | ||
} | ||
|
||
/** | ||
* The destination that the parsed JSON is written to. Defaults to the root of the Event. | ||
* If the destination field already exists, it will be overwritten. | ||
* | ||
* @return The name of the destination field. | ||
*/ | ||
public String getDestination() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better for the default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we used ex: Would That might be useful, but I don't think that's what we're going for here so maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now the behavior if someone had multiple I do see how I think we can stick with a |
||
return destination; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package com.amazon.dataprepper.plugins.processor.parsejson; | ||
|
||
import com.amazon.dataprepper.metrics.PluginMetrics; | ||
import com.amazon.dataprepper.model.event.Event; | ||
import com.amazon.dataprepper.model.event.JacksonEvent; | ||
import com.amazon.dataprepper.model.record.Record; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
|
||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static java.util.Map.entry; | ||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.mockito.Mockito.when; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
class ParseJsonProcessorTest { | ||
private static final String DEEPLY_NESTED_KEY_NAME = "base"; | ||
|
||
@Mock | ||
private ParseJsonProcessorConfig processorConfig; | ||
|
||
@Mock | ||
private PluginMetrics pluginMetrics; | ||
|
||
private ParseJsonProcessor parseJsonProcessor; | ||
|
||
@BeforeEach | ||
void setup() { | ||
ParseJsonProcessorConfig defaultConfig = new ParseJsonProcessorConfig(); | ||
when(processorConfig.getSource()).thenReturn(defaultConfig.getSource()); | ||
when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); | ||
|
||
parseJsonProcessor = createObjectUnderTest(); | ||
} | ||
|
||
private ParseJsonProcessor createObjectUnderTest() { | ||
return new ParseJsonProcessor(pluginMetrics, processorConfig); | ||
} | ||
|
||
@Test | ||
void test_when_differentSourceAndDestination_then_processorParsesCorrectly() { | ||
final String source = "different_source"; | ||
final String destination = "destination_key"; | ||
when(processorConfig.getSource()).thenReturn(source); | ||
when(processorConfig.getDestination()).thenReturn(destination); | ||
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used | ||
|
||
final Map<String, Object> data = Collections.singletonMap("key", "value"); | ||
final String serializedMessage = convertMapToJSONString(data); | ||
final Event parsedEvent = createAndParseMessageEvent(serializedMessage); | ||
|
||
assertThat(parsedEvent.containsKey(source), equalTo(true)); | ||
assertThat(parsedEvent.containsKey(destination), equalTo(true)); | ||
|
||
assertThatFirstMapIsSubsetOfSecondMap(data, parsedEvent.get(destination, Map.class)); | ||
} | ||
|
||
@Test | ||
void test_when_dataFieldEqualToRootField_then_overwritesOriginalFields() { | ||
final String source = "root_source"; | ||
when(processorConfig.getSource()).thenReturn(source); | ||
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used | ||
|
||
final Map<String, Object> data = Map.ofEntries( | ||
entry(source,"value_that_will_overwrite_source"), | ||
entry("key","value") | ||
); | ||
|
||
final String serializedMessage = convertMapToJSONString(data); | ||
final Event parsedEvent = createAndParseMessageEvent(serializedMessage); | ||
|
||
assertThatKeyEquals(parsedEvent, source, "value_that_will_overwrite_source"); | ||
assertThatKeyEquals(parsedEvent, "key", "value"); | ||
} | ||
|
||
@Test | ||
void test_when_valueIsEmpty_then_notParsed() { | ||
final Map<String, Object> emptyData = Collections.singletonMap("key",""); // invalid JSON | ||
|
||
final String serializedMessage = convertMapToJSONString(emptyData); | ||
final Event parsedEvent = createAndParseMessageEvent(serializedMessage); | ||
|
||
assertThatKeyEquals(parsedEvent, processorConfig.getSource(), serializedMessage); | ||
assertThat(parsedEvent.toMap().size(), equalTo(1)); | ||
} | ||
|
||
@Test | ||
void test_when_deeplyNestedFieldInRoot_then_canReachDeepestLayer() { | ||
final int numberOfLayers = 200; | ||
final Map<String, Object> messageMap = constructArbitrarilyDeepJsonMap(numberOfLayers); | ||
final String serializedMessage = convertMapToJSONString(messageMap); | ||
|
||
final Event parsedEvent = createAndParseMessageEvent(serializedMessage); | ||
|
||
assertThatKeyEquals(parsedEvent, DEEPLY_NESTED_KEY_NAME, messageMap.get(DEEPLY_NESTED_KEY_NAME)); | ||
final String jsonPointerToValue = constructDeeplyNestedJsonPointer(numberOfLayers); | ||
assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); | ||
} | ||
|
||
@Test | ||
void test_when_deeplyNestedFieldInKey_then_canReachDeepestLayer() { | ||
final String destination = "destination_key"; | ||
when(processorConfig.getDestination()).thenReturn(destination); | ||
parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used | ||
|
||
final int numberOfLayers = 20; | ||
final Map<String, Object> messageMap = constructArbitrarilyDeepJsonMap(numberOfLayers); | ||
final String serializedMessage = convertMapToJSONString(messageMap); | ||
|
||
final Event parsedEvent = createAndParseMessageEvent(serializedMessage); | ||
|
||
final String completeDeeplyNestedKeyName = destination + "/" + DEEPLY_NESTED_KEY_NAME; | ||
assertThatKeyEquals(parsedEvent, completeDeeplyNestedKeyName, messageMap.get(DEEPLY_NESTED_KEY_NAME)); | ||
final String jsonPointerToValue = destination + constructDeeplyNestedJsonPointer(numberOfLayers); | ||
|
||
assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); | ||
} | ||
|
||
private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { | ||
String pointer = "/" + DEEPLY_NESTED_KEY_NAME; | ||
for (int layer = 0; layer < numberOfLayers; layer++) { | ||
pointer += "/key" + layer; | ||
} | ||
return pointer; | ||
} | ||
|
||
/** | ||
* Naive serialization that converts every = to : and wraps every word with double quotes (no error handling or input validation). | ||
* @param messageMap | ||
* @return | ||
*/ | ||
private String convertMapToJSONString(final Map<String, Object> messageMap) { | ||
final String replaceEquals = messageMap.toString().replace("=",":"); | ||
final String addQuotes = replaceEquals.replaceAll("(\\w+)", "\"$1\""); // wrap every word in quotes | ||
return addQuotes; | ||
} | ||
|
||
/** | ||
* Creates a Map that maps a single key to a value nested numberOfLayers layers deep. | ||
* @param numberOfLayers | ||
* @return | ||
*/ | ||
private Map<String, Object> constructArbitrarilyDeepJsonMap(final int numberOfLayers) { | ||
final Map<String, Object> result = Collections.singletonMap(DEEPLY_NESTED_KEY_NAME,deepJsonMapHelper(0,numberOfLayers)); | ||
return result; | ||
} | ||
|
||
private Object deepJsonMapHelper(final int currentLayer, final int numberOfLayers) { | ||
if (currentLayer >= numberOfLayers) return "value"; | ||
|
||
final String key = "key" + currentLayer; | ||
return Collections.singletonMap(key, deepJsonMapHelper(currentLayer+1, numberOfLayers)); | ||
} | ||
|
||
private Event createAndParseMessageEvent(final String message) { | ||
final Record<Event> eventUnderTest = createMessageEvent(message); | ||
final List<Record<Event>> editedEvents = (List<Record<Event>>) parseJsonProcessor.doExecute( | ||
Collections.singletonList(eventUnderTest)); | ||
return editedEvents.get(0).getData(); | ||
} | ||
|
||
|
||
private Record<Event> createMessageEvent(final String message) { | ||
final Map<String, Object> eventData = new HashMap<>(); | ||
eventData.put(processorConfig.getSource(), message); | ||
return buildRecordWithEvent(eventData); | ||
} | ||
|
||
private Record<Event> buildRecordWithEvent(final Map<String, Object> data) { | ||
return new Record<>(JacksonEvent.builder() | ||
.withData(data) | ||
.withEventType("event") | ||
.build()); | ||
} | ||
|
||
private void assertThatKeyEquals(final Event parsedEvent, final String key, final Object value) { | ||
assertThat(parsedEvent.containsKey(key), equalTo(true)); | ||
assertThat(parsedEvent.get(key, Object.class), equalTo(value)); | ||
} | ||
|
||
private void assertThatFirstMapIsSubsetOfSecondMap(final Map<String, Object> subset, final Map<String, Object> secondMap) { | ||
assertThat(Objects.nonNull(subset), equalTo(true)); | ||
assertThat(Objects.nonNull(secondMap), equalTo(true)); | ||
|
||
assertThat((subset.size() <= secondMap.size()), equalTo(true)); | ||
|
||
for (Map.Entry<String, Object> entry : subset.entrySet()) { | ||
final String key = entry.getKey(); | ||
final Object value = entry.getValue(); | ||
assertThat(secondMap.containsKey(key), equalTo(true)); | ||
assertThat(secondMap.get(key), equalTo(value)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend setting a minimum test coverage to maintain our high coding standards. Here is an example for another plugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the recommendation!
I set the threshold to 96% but it's slightly higher in reality because there aren't any unit tests for the
prepareForShutdown
,isReadyForShutdown
, andshutdown
methods. These methods are needed to extendAbstractProcessor
but they're empty becauseparse_json
doesn't have any special behavior for shutdown. The implemented line and branch coverage are 100%.