From a9bbfb8687c4d291533d7ea1a1b1a5053d6cf08c Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Tue, 23 Jan 2024 21:13:58 +0000 Subject: [PATCH 1/5] Aggregator processor should evaluate aggregate_when condition before forwarding events to remote peer Signed-off-by: Krishna Kondaka --- .../peerforwarder/RequiresPeerForwarding.java | 13 +++++++ .../RequiresPeerForwardingTest.java | 34 +++++++++++++++++++ .../PeerForwardingProcessorDecorator.java | 5 ++- .../dataprepper/pipeline/ProcessWorker.java | 4 +-- .../aggregate/AggregateProcessor.java | 16 +++++++++ .../aggregate/AggregateProcessorTest.java | 2 ++ 6 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java index 2e46e89cdb..26ab0f2b8d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java @@ -4,6 +4,8 @@ */ package org.opensearch.dataprepper.model.peerforwarder; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; @@ -18,4 +20,15 @@ public interface RequiresPeerForwarding { * @return A set of keys */ Collection getIdentificationKeys(); + + /** + * Returns events that are applicable for peer forwarding. + * + * @param records collection of input records + * + * @return a collection of output records + */ + default Collection> applicableEventsForPeerForwarding(Collection> records) { + return records; + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java new file mode 100644 index 0000000000..b5a8e98d5b --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.peerforwarder; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.Event; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +import java.util.Collection; + +class RequiresPeerForwardingTest { + + public class SimpleRequiresPeerForwarding implements RequiresPeerForwarding { + @Override + public Collection getIdentificationKeys() { + return null; + } + } + + @Test + void testRequiresPeerForwardingTest() { + Collection> records = mock(Collection.class); + RequiresPeerForwarding requiresPeerForwarding = new SimpleRequiresPeerForwarding(); + assertThat(requiresPeerForwarding.applicableEventsForPeerForwarding(records), equalTo(records)); + } +} + + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index c3c0f9977b..c1949581e6 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -15,6 +15,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -79,7 +80,9 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina @Override public Collection> execute(final Collection> records) { - final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(records); + final Collection> recordsToProcess = ((RequiresPeerForwarding)innerProcessor).applicableEventsForPeerForwarding(records); + final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); + final Collection> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); final Collection> recordsToProcessLocally = CollectionUtils.union( diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index fb4effb413..8cfd1ddd43 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -93,8 +93,8 @@ public void run() { } } - private void processAcknowledgements(List inputEvents, Collection outputRecords) { - Set outputEventsSet = ((ArrayList>)outputRecords).stream().map(Record::getData).collect(Collectors.toSet()); + private void processAcknowledgements(List inputEvents, Collection> outputRecords) { + Set outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet()); // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it inputEvents.forEach(event -> { EventHandle eventHandle = event.getEventHandle(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index f44592c3a1..eca9496596 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -20,6 +20,7 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -148,6 +149,21 @@ public void shutdown() { } + @Override + public Collection> applicableEventsForPeerForwarding(Collection> records) { + if (whenCondition == null) { + return records; + } + final Collection> recordsOut = new ArrayList<>(); + for (Record record: records) { + Event event = record.getData(); + if (expressionEvaluator.evaluateConditional(whenCondition, event)) { + recordsOut.add(record); + } + } + return recordsOut; + } + @Override public Collection getIdentificationKeys() { return aggregateProcessorConfig.getIdentificationKeys(); diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index ad0d763078..38b8d5306f 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -228,6 +228,8 @@ void handleEvent_returning_with_condition_eliminates_one_record() { recordsIn.add(new Record(secondEvent)); recordsIn.add(new Record(event)); Collection> c = recordsIn; + Collection> applicableRecords = objectUnderTest.applicableEventsForPeerForwarding(recordsIn); + assertThat(applicableRecords.size(), equalTo(2)); final List> recordsOut = (List>) objectUnderTest.doExecute(c); assertThat(recordsOut.size(), equalTo(2)); From 5e2d2b5aa65e6fe0a070f4919ae0596cb25e8a2b Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 24 Jan 2024 02:29:41 +0000 Subject: [PATCH 2/5] Addressed review comments Signed-off-by: Krishna Kondaka --- .../peerforwarder/RequiresPeerForwarding.java | 10 +++--- .../RequiresPeerForwardingTest.java | 5 ++- .../PeerForwardingProcessorDecorator.java | 14 ++++++-- ...PeerForwardingProcessingDecoratorTest.java | 36 +++++++++++++++++-- .../aggregate/AggregateProcessor.java | 14 ++------ .../aggregate/AggregateProcessorTest.java | 5 +-- 6 files changed, 59 insertions(+), 25 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java index 26ab0f2b8d..caa2ce71e0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java @@ -22,13 +22,13 @@ public interface RequiresPeerForwarding { Collection getIdentificationKeys(); /** - * Returns events that are applicable for peer forwarding. + * Determines if an event should be forwarded to the remote peer * - * @param records collection of input records + * @param event input event * - * @return a collection of output records + * @return true if the event should be forwarded to the peer */ - default Collection> applicableEventsForPeerForwarding(Collection> records) { - return records; + default boolean isApplicableEventForPeerForwarding(Event event) { + return true; } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java index b5a8e98d5b..c2edf502df 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwardingTest.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.model.peerforwarder; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.event.Event; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -25,9 +24,9 @@ public Collection getIdentificationKeys() { @Test void testRequiresPeerForwardingTest() { - Collection> records = mock(Collection.class); + Event event = mock(Event.class); RequiresPeerForwarding requiresPeerForwarding = new SimpleRequiresPeerForwarding(); - assertThat(requiresPeerForwarding.applicableEventsForPeerForwarding(records), equalTo(records)); + assertThat(requiresPeerForwarding.isApplicableEventForPeerForwarding(event), equalTo(true)); } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index c1949581e6..fa85146db2 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -80,7 +80,15 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina @Override public Collection> execute(final Collection> records) { - final Collection> recordsToProcess = ((RequiresPeerForwarding)innerProcessor).applicableEventsForPeerForwarding(records); + final Collection> recordsToProcess = new ArrayList<>(); + final Collection> recordsSkipped = new ArrayList<>(); + for (Record record: records) { + if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) { + recordsToProcess.add(record); + } else { + recordsSkipped.add(record); + } + } final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); final Collection> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); @@ -88,7 +96,9 @@ public Collection> execute(final Collection> records final Collection> recordsToProcessLocally = CollectionUtils.union( recordsToProcessOnLocalPeer, receivedRecordsFromBuffer); - return innerProcessor.execute(recordsToProcessLocally); + Collection> recordsOut = innerProcessor.execute(recordsToProcessLocally); + recordsOut.addAll(recordsSkipped); + return recordsOut; } @Override diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index 3956c76efe..5a0a4907fa 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -29,6 +29,7 @@ import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.ArgumentMatchers.anyCollection; @@ -97,7 +98,6 @@ void decorateProcessors_with_different_identification_key_should_throw() { assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); } - @Test void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() { final List processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList()); @@ -180,6 +180,38 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut verify(processor).execute(anyCollection()); } + @Test + void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_overridden() { + Event event1 = mock(Event.class); + Event event2 = mock(Event.class); + Event event3 = mock(Event.class); + Record record1 = mock(Record.class); + Record record2 = mock(Record.class); + Record record3 = mock(Record.class); + Record aggregatedRecord = mock(Record.class); + List aggregatedRecords = new ArrayList<>(); + aggregatedRecords.add(aggregatedRecord); + when(processor.execute(anyCollection())).thenReturn(aggregatedRecords); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event1)).thenReturn(true); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true); + final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + when(record1.getData()).thenReturn(event1); + when(record2.getData()).thenReturn(event2); + when(record3.getData()).thenReturn(event3); + Collection> recordsIn = new ArrayList<>(); + recordsIn.add(record1); + recordsIn.add(record2); + recordsIn.add(record3); + + assertThat(processors.size(), equalTo(1)); + Collection> recordsOut = processors.get(0).execute(recordsIn); + verify(processor).execute(anyCollection()); + assertThat(recordsOut.size(), equalTo(2)); + assertTrue(recordsOut.contains(aggregatedRecord)); + assertTrue(recordsOut.contains(record2)); + } + @Test void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() { final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); @@ -208,4 +240,4 @@ void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutd } } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index eca9496596..e898840e2e 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -20,7 +20,6 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -150,18 +149,11 @@ public void shutdown() { } @Override - public Collection> applicableEventsForPeerForwarding(Collection> records) { + public boolean isApplicableEventForPeerForwarding(Event event) { if (whenCondition == null) { - return records; + return true; } - final Collection> recordsOut = new ArrayList<>(); - for (Record record: records) { - Event event = record.getData(); - if (expressionEvaluator.evaluateConditional(whenCondition, event)) { - recordsOut.add(record); - } - } - return recordsOut; + return expressionEvaluator.evaluateConditional(whenCondition, event); } @Override diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java index 38b8d5306f..9a439efea2 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorTest.java @@ -228,8 +228,9 @@ void handleEvent_returning_with_condition_eliminates_one_record() { recordsIn.add(new Record(secondEvent)); recordsIn.add(new Record(event)); Collection> c = recordsIn; - Collection> applicableRecords = objectUnderTest.applicableEventsForPeerForwarding(recordsIn); - assertThat(applicableRecords.size(), equalTo(2)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(true)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(true)); + assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false)); final List> recordsOut = (List>) objectUnderTest.doExecute(c); assertThat(recordsOut.size(), equalTo(2)); From 8fc255f551bda0892e09552226b07178337c2679 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 24 Jan 2024 19:58:59 +0000 Subject: [PATCH 3/5] Fixed check style error Signed-off-by: Krishna Kondaka --- .../dataprepper/model/peerforwarder/RequiresPeerForwarding.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java index caa2ce71e0..3f00b86bf0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/peerforwarder/RequiresPeerForwarding.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.model.peerforwarder; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; import java.util.Collection; From 472238ff2b8315adae68afedd8f4337b4e71fba6 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 24 Jan 2024 22:50:08 +0000 Subject: [PATCH 4/5] Fixed failing tests Signed-off-by: Krishna Kondaka --- .../PeerForwardingProcessorDecorator.java | 5 +++++ .../PeerForwardingProcessingDecoratorTest.java | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index fa85146db2..b58d449568 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -82,13 +82,17 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina public Collection> execute(final Collection> records) { final Collection> recordsToProcess = new ArrayList<>(); final Collection> recordsSkipped = new ArrayList<>(); + System.out.println("_____0____"+records.size()); for (Record record: records) { if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) { + System.out.println("_____1____"+record.getData().toJsonString()); recordsToProcess.add(record); } else { + System.out.println("_____2____"+record.getData().toJsonString()); recordsSkipped.add(record); } } + System.out.println("_____3____"+records.size()); final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); final Collection> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); @@ -97,6 +101,7 @@ public Collection> execute(final Collection> records recordsToProcessOnLocalPeer, receivedRecordsFromBuffer); Collection> recordsOut = innerProcessor.execute(recordsToProcessLocally); + System.out.println("====="+recordsOut+"====="+recordsSkipped); recordsOut.addAll(recordsSkipped); return recordsOut; } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index 5a0a4907fa..a9611178bc 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -129,9 +129,12 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific @Test void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correct_values() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); List> testData = Collections.singletonList(record); - when(peerForwarder.forwardRecords(testData)).thenReturn(testData); + when(peerForwarder.forwardRecords(anyCollection())).thenReturn(testData); when(processor.execute(testData)).thenReturn(testData); @@ -140,7 +143,7 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc final Collection> records = processors.get(0).execute(testData); verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); - verify(peerForwarder).forwardRecords(testData); + verify(peerForwarder).forwardRecords(anyCollection()); Assertions.assertNotNull(records); assertThat(records.size(), equalTo(testData.size())); assertThat(records, equalTo(testData)); @@ -148,10 +151,13 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc @Test void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); Collection> forwardTestData = Collections.singletonList(record); Collection> receiveTestData = Collections.singletonList(mock(Record.class)); - when(peerForwarder.forwardRecords(forwardTestData)).thenReturn(forwardTestData); + when(peerForwarder.forwardRecords(anyCollection())).thenReturn(forwardTestData); when(peerForwarder.receiveRecords()).thenReturn(receiveTestData); final Collection> expectedRecordsToProcessLocally = CollectionUtils.union(forwardTestData, receiveTestData); @@ -163,7 +169,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { final Collection> records = processors.get(0).execute(forwardTestData); verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); - verify(peerForwarder).forwardRecords(forwardTestData); + verify(peerForwarder).forwardRecords(anyCollection()); verify(peerForwarder).receiveRecords(); Assertions.assertNotNull(records); assertThat(records.size(), equalTo(expectedRecordsToProcessLocally.size())); @@ -172,6 +178,9 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { @Test void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execute() { + Event event = mock(Event.class); + when(record.getData()).thenReturn(event); + when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); From 7be54b7c4f8a06b7bb0f272904e469455d2870d9 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 24 Jan 2024 23:03:53 +0000 Subject: [PATCH 5/5] Fixed failing tests Signed-off-by: Krishna Kondaka --- .../peerforwarder/PeerForwardingProcessorDecorator.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index b58d449568..fa85146db2 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -82,17 +82,13 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina public Collection> execute(final Collection> records) { final Collection> recordsToProcess = new ArrayList<>(); final Collection> recordsSkipped = new ArrayList<>(); - System.out.println("_____0____"+records.size()); for (Record record: records) { if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) { - System.out.println("_____1____"+record.getData().toJsonString()); recordsToProcess.add(record); } else { - System.out.println("_____2____"+record.getData().toJsonString()); recordsSkipped.add(record); } } - System.out.println("_____3____"+records.size()); final Collection> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); final Collection> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); @@ -101,7 +97,6 @@ public Collection> execute(final Collection> records recordsToProcessOnLocalPeer, receivedRecordsFromBuffer); Collection> recordsOut = innerProcessor.execute(recordsToProcessLocally); - System.out.println("====="+recordsOut+"====="+recordsSkipped); recordsOut.addAll(recordsSkipped); return recordsOut; }