Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple aggregate processors in local mode #4574

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class DefaultPeerForwarderProvider implements PeerForwarderProvider {

private final PeerForwarderClientFactory peerForwarderClientFactory;
private final PeerForwarderClient peerForwarderClient;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PluginMetrics pluginMetrics;
private final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
private HashRing hashRing;

DefaultPeerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
final PluginMetrics pluginMetrics) {
this.peerForwarderClientFactory = peerForwarderClientFactory;
this.peerForwarderClient = peerForwarderClient;
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.pluginMetrics = pluginMetrics;
}

public PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys,
final Integer pipelineWorkerThreads) {
if (pipelinePeerForwarderReceiveBufferMap.containsKey(pipelineName) &&
pipelinePeerForwarderReceiveBufferMap.get(pipelineName).containsKey(pluginId)) {
throw new RuntimeException("Data Prepper 2.0 will only support a single peer-forwarder per pipeline/plugin type");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say Data Prepper 2.x?

}

final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = createBufferPerPipelineProcessor(pipelineName, pluginId);

if (isPeerForwardingRequired()) {
if (hashRing == null) {
hashRing = peerForwarderClientFactory.createHashRing();
}
return new RemotePeerForwarder(
peerForwarderClient,
hashRing,
peerForwarderReceiveBuffer,
pipelineName,
pluginId,
identificationKeys,
pluginMetrics,
peerForwarderConfiguration.getBatchDelay(),
peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(),
peerForwarderConfiguration.getForwardingBatchSize(),
peerForwarderConfiguration.getForwardingBatchQueueDepth(),
peerForwarderConfiguration.getForwardingBatchTimeout(),
pipelineWorkerThreads
);
}
else {
return new LocalPeerForwarder();
}
}

private PeerForwarderReceiveBuffer<Record<Event>> createBufferPerPipelineProcessor(final String pipelineName, final String pluginId) {
final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = new
PeerForwarderReceiveBuffer<>(peerForwarderConfiguration.getBufferSize(), peerForwarderConfiguration.getBatchSize(), pipelineName, pluginId);

final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginsBufferMap =
pipelinePeerForwarderReceiveBufferMap.computeIfAbsent(pipelineName, k -> new HashMap<>());

pluginsBufferMap.put(pluginId, peerForwarderReceiveBuffer);

return peerForwarderReceiveBuffer;
}

public boolean isPeerForwardingRequired() {
return arePeersConfigured() && pipelinePeerForwarderReceiveBufferMap.size() > 0;
}

public boolean arePeersConfigured() {
final DiscoveryMode discoveryMode = peerForwarderConfiguration.getDiscoveryMode();
if (discoveryMode.equals(DiscoveryMode.LOCAL_NODE)) {
return false;
}
else if (discoveryMode.equals(DiscoveryMode.STATIC) && peerForwarderConfiguration.getStaticEndpoints().size() <= 1) {
return false;
}
return true;
}

public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return pipelinePeerForwarderReceiveBufferMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;

import java.util.Map;
import java.util.Set;

public class LocalModePeerForwarderProvider implements PeerForwarderProvider {

private final PeerForwarderProvider peerForwarderProvider;
private boolean isRemotePeerForwarderRegistered;

public LocalModePeerForwarderProvider(final PeerForwarderProvider peerForwarderProvider) {
this.peerForwarderProvider = peerForwarderProvider;
this.isRemotePeerForwarderRegistered = false;
}

@Override
public PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys, final Integer pipelineWorkerThreads) {
if (((RequiresPeerForwarding)processor).isForLocalProcessingOnly(null)) {
return new LocalPeerForwarder();
}
isRemotePeerForwarderRegistered = true;
return peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
}

@Override
public boolean isPeerForwardingRequired() {
return isRemotePeerForwarderRegistered;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little strange that there is no way to verify that register is called before this method, meaning that calling this method at first would return false, then after calling register it may return true. Not sure if it's worth finding a way to enforce this

}

@Override
public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return (isRemotePeerForwarderRegistered) ?
peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap() :
Map.of();
}

@Override
public boolean arePeersConfigured() {
return isRemotePeerForwarderRegistered ? peerForwarderProvider.arePeersConfigured() : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.inject.Named;


@Configuration
class PeerForwarderAppConfig {
Expand Down Expand Up @@ -71,12 +75,18 @@ public PeerForwarderClient peerForwarderClient(final PeerForwarderConfiguration
peerForwarderConfiguration, peerForwarderClientFactory, peerForwarderCodec, pluginMetrics);
}

@Bean
public PeerForwarderProvider peerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
@Bean(name = "defaultPeerForwarder")
public DefaultPeerForwarderProvider peerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
@Qualifier("peerForwarderMetrics") final PluginMetrics pluginMetrics) {
return new PeerForwarderProvider(peerForwarderClientFactory, peerForwarderClient, peerForwarderConfiguration, pluginMetrics);
return new DefaultPeerForwarderProvider(peerForwarderClientFactory, peerForwarderClient, peerForwarderConfiguration, pluginMetrics);
}

@Bean
@Primary
public PeerForwarderProvider peerForwarderProvider(@Named("defaultPeerForwarder") final PeerForwarderProvider peerForwarderProvider) {
return new LocalModePeerForwarderProvider(peerForwarderProvider);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,97 +5,49 @@

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PeerForwarderProvider {

private final PeerForwarderClientFactory peerForwarderClientFactory;
private final PeerForwarderClient peerForwarderClient;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PluginMetrics pluginMetrics;
private final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
private HashRing hashRing;

PeerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
final PluginMetrics pluginMetrics) {
this.peerForwarderClientFactory = peerForwarderClientFactory;
this.peerForwarderClient = peerForwarderClient;
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.pluginMetrics = pluginMetrics;
}

public PeerForwarder register(final String pipelineName, final String pluginId, final Set<String> identificationKeys,
final Integer pipelineWorkerThreads) {
if (pipelinePeerForwarderReceiveBufferMap.containsKey(pipelineName) &&
pipelinePeerForwarderReceiveBufferMap.get(pipelineName).containsKey(pluginId)) {
throw new RuntimeException("Data Prepper 2.0 will only support a single peer-forwarder per pipeline/plugin type");
}

final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = createBufferPerPipelineProcessor(pipelineName, pluginId);

if (isPeerForwardingRequired()) {
if (hashRing == null) {
hashRing = peerForwarderClientFactory.createHashRing();
}
return new RemotePeerForwarder(
peerForwarderClient,
hashRing,
peerForwarderReceiveBuffer,
pipelineName,
pluginId,
identificationKeys,
pluginMetrics,
peerForwarderConfiguration.getBatchDelay(),
peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(),
peerForwarderConfiguration.getForwardingBatchSize(),
peerForwarderConfiguration.getForwardingBatchQueueDepth(),
peerForwarderConfiguration.getForwardingBatchTimeout(),
pipelineWorkerThreads
);
}
else {
return new LocalPeerForwarder();
}
}

private PeerForwarderReceiveBuffer<Record<Event>> createBufferPerPipelineProcessor(final String pipelineName, final String pluginId) {
final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = new
PeerForwarderReceiveBuffer<>(peerForwarderConfiguration.getBufferSize(), peerForwarderConfiguration.getBatchSize(), pipelineName, pluginId);

final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginsBufferMap =
pipelinePeerForwarderReceiveBufferMap.computeIfAbsent(pipelineName, k -> new HashMap<>());

pluginsBufferMap.put(pluginId, peerForwarderReceiveBuffer);

return peerForwarderReceiveBuffer;
}

public boolean isPeerForwardingRequired() {
return arePeersConfigured() && pipelinePeerForwarderReceiveBufferMap.size() > 0;
}

private boolean arePeersConfigured() {
final DiscoveryMode discoveryMode = peerForwarderConfiguration.getDiscoveryMode();
if (discoveryMode.equals(DiscoveryMode.LOCAL_NODE)) {
return false;
}
else if (discoveryMode.equals(DiscoveryMode.STATIC) && peerForwarderConfiguration.getStaticEndpoints().size() <= 1) {
return false;
}
return true;
}

public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return pipelinePeerForwarderReceiveBufferMap;
}
public interface PeerForwarderProvider {
/**
* Registers a pipeline and identification keys
*
* @param pipelineName pipeline name
* @param processor processor
* @param pluginId plugin id
* @param identificationKeys identification keys
* @param pipelineWorkerThreads number of pipeline worker threads
* @return peer forwarder
* @since 2.9
*/
PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys, final Integer pipelineWorkerThreads);

/**
* Returns if peer forwarding required
*
* @return returns if peer forwarding required or nto
* @since 2.9
*/
boolean isPeerForwardingRequired();

/**
* Returns if peers configured
*
* @return returns if peers configured
* @since 2.9
*/
boolean arePeersConfigured();

/**
* Returns pipeline peer forwarder receive buffer map
*
* @return Map of buffer per pipeline per pluginId
* @since 2.9
*/
Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap();
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ public static List<Processor> decorateProcessors(
"Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
return processors.stream()
.map(processor -> {
PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
return new PeerForwardingProcessorDecorator(peerForwarder, processor);
})
.collect(Collectors.toList());
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) {
Expand Down
Loading
Loading