Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update plugins for new plugin framework #451

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.amazon.dataprepper.model.sink.Sink;
import com.amazon.dataprepper.model.source.Source;
import com.amazon.dataprepper.plugins.TestSink;
import com.amazon.dataprepper.plugins.TestSinkUpdated;
import com.amazon.dataprepper.plugins.TestSinkDeprecatedApproach;
import com.amazon.dataprepper.plugins.TestSource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
Expand Down Expand Up @@ -85,7 +85,7 @@ class WithPredefinedPlugins {
void setUp() {
given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(new HashSet<>(Arrays.asList(
TestSink.class, TestSource.class, TestSinkUpdated.class)));
TestSink.class, TestSource.class, TestSinkDeprecatedApproach.class)));
}

@Test
Expand All @@ -104,18 +104,18 @@ void findPlugin_should_return_empty_if_plugin_found_for_another_type() {

@Test
void findPlugin_should_return_plugin_if_found_for_name_and_type() {
final Optional<Class<? extends Sink>> optionalPlugin = createObjectUnderTest().findPluginClass(Sink.class, "test_sink");
final Optional<Class<? extends Sink>> optionalPlugin = createObjectUnderTest().findPluginClass(Sink.class, "test_sink_deprecated_type");
assertThat(optionalPlugin, notNullValue());
assertThat(optionalPlugin.isPresent(), equalTo(true));
assertThat(optionalPlugin.get(), equalTo(TestSink.class));
assertThat(optionalPlugin.get(), equalTo(TestSinkDeprecatedApproach.class));
}

@Test
void findPlugin_should_return_plugin_if_found_for_name_and_type_using_pluginType() {
final Optional<Class<? extends Sink>> optionalPlugin = createObjectUnderTest().findPluginClass(Sink.class, "test_sink_updated");
final Optional<Class<? extends Sink>> optionalPlugin = createObjectUnderTest().findPluginClass(Sink.class, "test_sink");
assertThat(optionalPlugin, notNullValue());
assertThat(optionalPlugin.isPresent(), equalTo(true));
assertThat(optionalPlugin.get(), equalTo(TestSinkUpdated.class));
assertThat(optionalPlugin.get(), equalTo(TestSink.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.Sink;
Expand All @@ -21,7 +20,7 @@
import java.util.List;
import java.util.stream.Collectors;

@DataPrepperPlugin(name = "test_sink", type = PluginType.SINK)
@DataPrepperPlugin(name = "test_sink", pluginType = Sink.class)
public class TestSink implements Sink<Record<String>> {
private final List<Record<String>> collectedRecords;
private final boolean failSinkForTest;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.amazon.dataprepper.plugins;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.Sink;

import java.util.Collection;

@DataPrepperPlugin(name = "test_sink_updated", pluginType = Sink.class)
public class TestSinkUpdated implements Sink<Record<String>> {
@DataPrepperPlugin(name = "test_sink_deprecated_type", type = PluginType.SINK)
public class TestSinkDeprecatedApproach implements Sink<Record<String>> {
@Override
public void output(final Collection<Record<String>> records) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.record.Record;
Expand All @@ -23,7 +22,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "test_source", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "test_source", pluginType = Source.class)
public class TestSource implements Source<Record<String>> {
public static final List<Record<String>> TEST_DATA = Stream.of("TEST")
.map(Record::new).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@

package com.amazon.dataprepper.plugins.buffer.blockingbuffer;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.CheckpointState;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.AbstractBuffer;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.buffer.SizeOverflowException;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.CheckpointState;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,7 +45,7 @@
* record is null. {@link #read(int)} retrieves and removes the batch of records from the head of the queue. The
* batch size is defined/determined by the configuration attribute {@link #ATTRIBUTE_BATCH_SIZE} or the timeout parameter
*/
@DataPrepperPlugin(name = "bounded_blocking", type = PluginType.BUFFER)
@DataPrepperPlugin(name = "bounded_blocking", pluginType = Buffer.class)
public class BlockingBuffer<T extends Record<?>> extends AbstractBuffer<T> {
private static final Logger LOG = LoggerFactory.getLogger(BlockingBuffer.class);
private static final int DEFAULT_BUFFER_CAPACITY = 512;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -51,7 +52,7 @@ private static void findPlugins() {
final DataPrepperPlugin dataPrepperPluginAnnotation = annotatedClass
.getAnnotation(DataPrepperPlugin.class);
final String pluginName = dataPrepperPluginAnnotation.name();
final PluginType pluginType = dataPrepperPluginAnnotation.type();
final PluginType pluginType = extractPluginType(dataPrepperPluginAnnotation);
switch (pluginType) {
case SOURCE:
SOURCES.put(pluginName, (Class<Source>) annotatedClass);
Expand All @@ -69,6 +70,26 @@ private static void findPlugins() {
}
}

private static PluginType extractPluginType(final DataPrepperPlugin dataPrepperPluginAnnotation) {
PluginType pluginType = dataPrepperPluginAnnotation.type();
if(pluginType == PluginType.NONE) {
final Class<?> pluginClassType = dataPrepperPluginAnnotation.pluginType();
if(Objects.equals(pluginClassType, Source.class)) {
pluginType = PluginType.SOURCE;
}
else if(Objects.equals(pluginClassType, Buffer.class)) {
pluginType = PluginType.BUFFER;
}
else if(Objects.equals(pluginClassType, Prepper.class)) {
pluginType = PluginType.PREPPER;
}
else if(Objects.equals(pluginClassType, Sink.class)) {
pluginType = PluginType.SINK;
}
}
return pluginType;
}

public static Class<Source> getSourceClass(final String name) {
return SOURCES.get(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@

package com.amazon.dataprepper.plugins.prepper;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.model.record.Record;

import java.util.Collection;

@DataPrepperPlugin(name = "no-op", type = PluginType.PREPPER)
@DataPrepperPlugin(name = "no-op", pluginType = Prepper.class)
public class NoOpPrepper<InputT extends Record<?>> implements Prepper<InputT, InputT> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.prepper;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.Prepper;
Expand All @@ -24,7 +23,7 @@
* An simple String implementation of {@link Prepper} which generates new Records with upper case or lowercase content. The current
* simpler implementation does not handle errors (if any).
*/
@DataPrepperPlugin(name = "string_converter", type = PluginType.PREPPER)
@DataPrepperPlugin(name = "string_converter", pluginType = Prepper.class)
public class StringPrepper implements Prepper<Record<String>, Record<String>> {

public static final String UPPER_CASE = "upper_case";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.sink;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
Expand All @@ -26,7 +25,7 @@

import static java.lang.String.format;

@DataPrepperPlugin(name = "file", type = PluginType.SINK)
@DataPrepperPlugin(name = "file", pluginType = Sink.class)
public class FileSink implements Sink<Record<String>> {
private static final String SAMPLE_FILE_PATH = "src/resources/file-test-sample-output.txt";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.sink;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
Expand All @@ -20,7 +19,7 @@
import java.util.Collection;
import java.util.Iterator;

@DataPrepperPlugin(name = "stdout", type = PluginType.SINK)
@DataPrepperPlugin(name = "stdout", pluginType = Sink.class)
public class StdOutSink implements Sink<Record<String>> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.configuration.PluginSetting;
Expand All @@ -30,7 +29,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;

@DataPrepperPlugin(name = "file", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "file", pluginType = Source.class)
public class FileSource implements Source<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(FileSource.class);
private static final String ATTRIBUTE_PATH = "path";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@

package com.amazon.dataprepper.plugins.source;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Generates a random string every 500 milliseconds. Intended to be used for testing setups
*/
@DataPrepperPlugin(name = "random", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "random", pluginType = Source.class)
public class RandomStringSource implements Source<Record<String>> {

private static Logger LOG = LoggerFactory.getLogger(RandomStringSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.configuration.PluginSetting;
Expand All @@ -30,7 +29,7 @@
* A simple source which reads data from console each line at a time. It exits when it reads case insensitive "exit"
* from console or if Pipeline notifies to stop.
*/
@DataPrepperPlugin(name = "stdin", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "stdin", pluginType = Source.class)
public class StdInSource implements Source<Record<String>> {
private static final Logger LOG = LoggerFactory.getLogger(StdInSource.class);
private static final String ATTRIBUTE_TIMEOUT = "write_timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@

package com.amazon.dataprepper.plugins;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;

import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "junit-test", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "junit-test", pluginType = Source.class)
public class ConstructorLessComponent implements Source<Record<String>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@

package com.amazon.dataprepper.plugins.buffer;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.CheckpointState;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.CheckpointState;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand All @@ -27,7 +26,7 @@
import java.util.Queue;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "test_buffer", type = PluginType.BUFFER)
@DataPrepperPlugin(name = "test_buffer", pluginType = Buffer.class)
public class TestBuffer implements Buffer<Record<String>> {
private static final String ATTRIBUTE_BATCH_SIZE = "batch_size";
private static final String ATTRIBUTE_IMITATE_TIMEOUT = "imitate_timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@

package com.amazon.dataprepper.plugins.prepper;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.SingleThread;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.model.record.Record;

import java.util.Collection;

@SingleThread
@DataPrepperPlugin(name = "test_prepper", type = PluginType.PREPPER)
@DataPrepperPlugin(name = "test_prepper", pluginType = Prepper.class)
public class TestPrepper implements Prepper<Record<String>, Record<String>> {
public boolean isShutdown = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package com.amazon.dataprepper.plugins.sink;


import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.sink.Sink;
Expand All @@ -22,7 +21,7 @@
import java.util.List;
import java.util.stream.Collectors;

@DataPrepperPlugin(name = "test-sink", type = PluginType.SINK)
@DataPrepperPlugin(name = "test-sink", pluginType = Sink.class)
public class TestSink implements Sink<Record<String>> {
private final List<Record<String>> collectedRecords;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.record.Record;
Expand All @@ -23,7 +22,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

@DataPrepperPlugin(name = "test-source", type = PluginType.SOURCE)
@DataPrepperPlugin(name = "test-source", pluginType = Source.class)
public class TestSource implements Source<Record<String>> {
public static final List<Record<String>> TEST_DATA = Stream.of("THIS", "IS", "TEST", "DATA")
.map(Record::new).collect(Collectors.toList());
Expand Down
Loading