Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a config option to do node local aggregation #4306

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/aggregate-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ While not necessary, a great way to set up the Aggregate Processor [identificati
### <a name="when"></a>
* `when` (Optional): A `String` that represents a condition that must be evaluated to true for the aggregation to be applied on the event. Events that do not evaluate to true on the condition are skipped. Default is no condition which means all events are included in the aggregation.

### <a name="local_only"></a>
* `local_only` (Optional): A `Boolean` indicating if the aggregation should be done local to node instead of forwarding to remote peers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update this line.


## Available Aggregate Actions

### <a name="remove_duplicates"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private final AggregateAction aggregateAction;

private boolean forceConclude = false;
private boolean localOnly = false;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -69,6 +70,7 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
this.actionHandleEventsOutCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_OUT);
this.actionHandleEventsDroppedCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_DROPPED);
this.whenCondition = aggregateProcessorConfig.getWhenCondition();
this.localOnly = aggregateProcessorConfig.getLocalOnly();

pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize);
}
Expand Down Expand Up @@ -153,6 +155,9 @@ public boolean isApplicableEventForPeerForwarding(Event event) {
if (whenCondition == null) {
return true;
}
if (localOnly) {
return false;
}
return expressionEvaluator.evaluateConditional(whenCondition, event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class AggregateProcessorConfig {
@NotNull
private PluginModel aggregateAction;

@JsonProperty("local_only")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this local_aggregations_only? It may make clear what is local only.

@NotNull
private Boolean localOnly = false;

@JsonProperty("aggregate_when")
private String whenCondition;

Expand All @@ -43,6 +47,10 @@ public String getWhenCondition() {
return whenCondition;
}

public Boolean getLocalOnly() {
return localOnly;
}

public PluginModel getAggregateAction() { return aggregateAction; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public void testDefault() {
final AggregateProcessorConfig aggregateConfig = new AggregateProcessorConfig();

assertThat(aggregateConfig.getGroupDuration(), equalTo(Duration.ofSeconds(AggregateProcessorConfig.DEFAULT_GROUP_DURATION_SECONDS)));
assertThat(aggregateConfig.getLocalOnly(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private AggregateProcessor createObjectUnderTest() {
@BeforeEach
void setUp() {
when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration);
when(aggregateProcessorConfig.getLocalOnly()).thenReturn(false);
when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -247,6 +248,69 @@ void handleEvent_returning_with_condition_eliminates_one_record() {
verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
final String eventKey = UUID.randomUUID().toString();
final String key1 = UUID.randomUUID().toString();
final String key2 = UUID.randomUUID().toString();
final String condition = "/" + eventKey + " == "+key1;
Event firstEvent;
Event secondEvent;
final Map<String, Object> eventMap1 = new HashMap<>();
eventMap1.put(eventKey, key1);

firstEvent = JacksonEvent.builder()
.withData(eventMap1)
.withEventType("event")
.build();

final Map<String, Object> eventMap2 = new HashMap<>();
eventMap2.put(eventKey, key2);

secondEvent = JacksonEvent.builder()
.withData(eventMap2)
.withEventType("event")
.build();


when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(aggregateProcessorConfig.getLocalOnly()).thenReturn(true);
final AggregateProcessor objectUnderTest = createObjectUnderTest();
when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.emptyList());
when(aggregateActionResponse.getEvent()).thenReturn(event);
when(firstAggregateActionResponse.getEvent()).thenReturn(firstEvent);

event.toMap().put(eventKey, key1);
List<Record<Event>> recordsIn = new ArrayList<>();
recordsIn.add(new Record<Event>(firstEvent));
recordsIn.add(new Record<Event>(secondEvent));
recordsIn.add(new Record<Event>(event));
Collection<Record<Event>> c = recordsIn;
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(event), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(firstEvent), equalTo(false));
assertThat(objectUnderTest.isApplicableEventForPeerForwarding(secondEvent), equalTo(false));
final List<Record<Event>> recordsOut = (List<Record<Event>>) objectUnderTest.doExecute(c);

assertThat(recordsOut.size(), equalTo(2));
assertThat(recordsOut.get(0), notNullValue());
assertThat(recordsOut.get(0).getData(), equalTo(firstEvent));
assertThat(recordsOut.get(1), notNullValue());
assertThat(recordsOut.get(1).getData(), equalTo(event));

verify(actionHandleEventsDroppedCounter).increment(1);
verify(actionHandleEventsOutCounter).increment(2);
verifyNoInteractions(actionConcludeGroupEventsDroppedCounter);
verifyNoInteractions(actionConcludeGroupEventsOutCounter);

verify(aggregateGroupManager).getGroupsToConclude(eq(false));
}

@Test
void handleEvent_returning_with_event_adds_event_to_records_out() {
final AggregateProcessor objectUnderTest = createObjectUnderTest();
Expand Down
Loading