Skip to content

Commit

Permalink
Add a new metric for OpenSearch Sink plugin: bulkRequestSizeBytes
Browse files Browse the repository at this point in the history
Signed-off-by: Han Jiang <[email protected]>
  • Loading branch information
jianghancn committed Nov 12, 2021
1 parent 52e0468 commit a456b14
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

@DataPrepperPlugin(name = "opensearch", pluginType = Sink.class)
public class OpenSearchSink extends AbstractSink<Record<Object>> {
public static final String BULKREQUEST_LATENCY = "bulkRequestLatency";
public static final String BULKREQUEST_ERRORS = "bulkRequestErrors";
public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes";

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
// Pulled from BulkRequest to make estimation of bytes consistent
Expand All @@ -68,10 +70,13 @@ public class OpenSearchSink extends AbstractSink<Record<Object>> {

private final Timer bulkRequestTimer;
private final Counter bulkRequestErrorsCounter;
private final AtomicLong bulkRequestSizeBytes;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
bulkRequestSizeBytes = pluginMetrics.gauge(BULKREQUEST_SIZE_BYTES, new AtomicLong());

this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
Expand Down Expand Up @@ -165,6 +170,7 @@ private void flushBatch(final BulkRequest bulkRequest) {
bulkRequestTimer.record(() -> {
try {
bulkRetryStrategy.execute(bulkRequest);
bulkRequestSizeBytes.addAndGet(bulkRequest.estimatedSizeInBytes());
} catch (final InterruptedException e) {
LOG.error("Unexpected Interrupt:", e);
bulkRequestErrorsCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(0.0, documentErrorsMeasurements.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(1, bulkRequestSizeBytesMetrics.size());
assertEquals(2188.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);
}

public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
Expand Down Expand Up @@ -233,6 +242,15 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(1, bulkRequestSizeBytesMetrics.size());
assertEquals(2181.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);
}

public void testInstantiateSinkServiceMapDefault() throws IOException {
Expand Down Expand Up @@ -277,6 +295,15 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
// COUNT
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(1, bulkRequestSizeBytesMetrics.size());
assertEquals(309.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);

// Check restart for index already exists
sink = new OpenSearchSink(pluginSetting);
sink.shutdown();
Expand Down

0 comments on commit a456b14

Please sign in to comment.