Skip to content

Commit

Permalink
Add Buffer Latency Metric (#4237)
Browse files Browse the repository at this point in the history
* Add Buffer Latency Metric

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed failing code verification test by adding new test case

Signed-off-by: Krishna Kondaka <[email protected]>

* Added null check before calling updateLatency

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Mar 7, 2024
1 parent eb27384 commit 1b02bfb
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ private MetricNames() {}
* Delimiter used to separate path components in metric names.
*/
public static final String DELIMITER = ".";

/**
* Metric representing buffer read latency
*/
public static final String READ_LATENCY = "readLatency";
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Instant;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,6 +35,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> {
private final Counter writeTimeoutCounter;
private final Counter recordsWriteFailed;
private final Timer writeTimer;
private final Timer latencyTimer;
private final Timer readTimer;
private final Timer checkpointTimer;

Expand All @@ -54,6 +58,7 @@ private AbstractBuffer(final PluginMetrics pluginMetrics, final String pipelineN
this.writeTimeoutCounter = pluginMetrics.counter(MetricNames.WRITE_TIMEOUTS);
this.writeTimer = pluginMetrics.timer(MetricNames.WRITE_TIME_ELAPSED);
this.readTimer = pluginMetrics.timer(MetricNames.READ_TIME_ELAPSED);
this.latencyTimer = pluginMetrics.timer(MetricNames.READ_LATENCY);
this.checkpointTimer = pluginMetrics.timer(MetricNames.CHECKPOINT_TIME_ELAPSED);
}

Expand Down Expand Up @@ -142,6 +147,23 @@ public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
return readResult;
}

Timer getLatencyTimer() {
return latencyTimer;
}

protected void updateLatency(Collection<T> records) {
for (T rec : records) {
if (rec instanceof Record) {
Object data = rec.getData();
if (data instanceof Event) {
Event event = (Event) data;
Instant receivedTime = event.getEventHandle().getInternalOriginationTime();
latencyTimer.record(Duration.between(receivedTime, Instant.now()));
}
}
}
}

@Override
public void checkpoint(final CheckpointState checkpointState) {
checkpointTimer.record(() -> doCheckpoint(checkpointState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;

import java.util.AbstractMap;
import java.util.ArrayList;
Expand All @@ -36,6 +38,7 @@
public class AbstractBufferTest {
private static final String BUFFER_NAME = "testBuffer";
private static final String PIPELINE_NAME = "pipelineName";
private static final String TEST_MESSAGE = "testMessage";

private PluginSetting testPluginSetting;

Expand Down Expand Up @@ -222,6 +225,16 @@ public void testWriteAllTimeoutMetric() throws TimeoutException {
assertEquals(1.0, timeoutMeasurements.get(0).getValue(), 0);
}

@Test
public void testUpdateLatency() {
final AbstractBuffer<Record<Event>> abstractBuffer = new AbstractBufferEventImpl(testPluginSetting);
final Collection<Record<Event>> testRecords = Arrays.asList(
new Record<>(JacksonEvent.fromMessage(TEST_MESSAGE)));
abstractBuffer.updateLatency(testRecords);
assertEquals(abstractBuffer.getLatencyTimer().count(), 1);

}

@Test
public void testWriteAllRecordsWriteFailedMetric() {
// Given
Expand Down Expand Up @@ -387,4 +400,33 @@ public void doWriteAll(Collection<Record<String>> records, int timeoutInMillis)
throw new NullPointerException();
}
}

public static class AbstractBufferEventImpl extends AbstractBuffer<Record<Event>> {
public AbstractBufferEventImpl(PluginSetting pluginSetting) {
super(pluginSetting);
}

@Override
public void doWrite(Record<Event> record, int timeoutInMillis) throws TimeoutException {
}

@Override
public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
}

@Override
public void doCheckpoint(final CheckpointState checkpointState) {

}

@Override
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
return null;
}

@Override
public boolean isEmpty() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public Map.Entry<Collection<T>, CheckpointState> doRead(final int timeoutInMilli
}
}

updateLatency((Collection<T>)records);
final CheckpointState checkpointState = new CheckpointState(recordsRead);
recordsInFlight += checkpointState.getNumRecordsToBeChecked();
return new AbstractMap.SimpleEntry<>(records, checkpointState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public Map.Entry<Collection<T>, CheckpointState> doRead(int timeoutInMillis) {
}
}

updateLatency((Collection<T>)records);
final CheckpointState checkpointState = new CheckpointState(recordsRead);
return new AbstractMap.SimpleEntry<>(records, checkpointState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier,
final CircuitBreaker circuitBreaker) {
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()), pluginSetting.getPipelineName());
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName());
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory());
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
this.byteDecoder = byteDecoder;
Expand Down Expand Up @@ -141,7 +141,11 @@ public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) t
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
try {
setMdc();
return innerBuffer.read(timeoutInMillis);
Map.Entry<Collection<Record<Event>>, CheckpointState> result = innerBuffer.read(timeoutInMillis);
if (result != null) {
updateLatency(result.getKey());
}
return result;
} finally {
resetMdc();
}
Expand Down

0 comments on commit 1b02bfb

Please sign in to comment.