-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggregator processor should evaluate aggregate_when condition before forwarding events to remote peer #4004
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.model.peerforwarder; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.mockito.Mockito.mock; | ||
|
||
import java.util.Collection; | ||
|
||
class RequiresPeerForwardingTest { | ||
|
||
public class SimpleRequiresPeerForwarding implements RequiresPeerForwarding { | ||
@Override | ||
public Collection<String> getIdentificationKeys() { | ||
return null; | ||
} | ||
} | ||
|
||
@Test | ||
void testRequiresPeerForwardingTest() { | ||
Collection<Record<Event>> records = mock(Collection.class); | ||
RequiresPeerForwarding requiresPeerForwarding = new SimpleRequiresPeerForwarding(); | ||
assertThat(requiresPeerForwarding.applicableEventsForPeerForwarding(records), equalTo(records)); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
|
||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
@@ -79,7 +80,9 @@ private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, fina | |
|
||
@Override | ||
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) { | ||
final Collection<Record<Event>> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(records); | ||
final Collection<Record<Event>> recordsToProcess = ((RequiresPeerForwarding)innerProcessor).applicableEventsForPeerForwarding(records); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the end of this method, we return:
The collection returned by the processor will be the input into the next processor. With this PR, the returned collection is going to be inaccurate when We need something like the following. I'll use
We'll definitely need a unit test here. It would be ideal to also have a core integration test. |
||
final Collection<Record<Event>> recordsToProcessOnLocalPeer = peerForwarder.forwardRecords(recordsToProcess); | ||
|
||
final Collection<Record<Event>> receivedRecordsFromBuffer = peerForwarder.receiveRecords(); | ||
|
||
final Collection<Record<Event>> recordsToProcessLocally = CollectionUtils.union( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import io.micrometer.core.instrument.Counter; | ||
import org.opensearch.dataprepper.plugins.hasher.IdentificationKeysHasher; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
|
@@ -148,6 +149,21 @@ public void shutdown() { | |
|
||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> applicableEventsForPeerForwarding(Collection<Record<Event>> records) { | ||
if (whenCondition == null) { | ||
return records; | ||
} | ||
final Collection<Record<Event>> recordsOut = new ArrayList<>(); | ||
for (Record<Event> record: records) { | ||
Event event = record.getData(); | ||
if (expressionEvaluator.evaluateConditional(whenCondition, event)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's share the same logic here as we use in the Maybe you could make a method that evaluates the conditional in both.
|
||
recordsOut.add(record); | ||
} | ||
} | ||
return recordsOut; | ||
} | ||
|
||
@Override | ||
public Collection<String> getIdentificationKeys() { | ||
return aggregateProcessorConfig.getIdentificationKeys(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this test!