Skip to content

Commit

Permalink
Do not send empty lists to the DLQ when all items share the same retr…
Browse files Browse the repository at this point in the history
…yable failure. Resolves #3644 (#3660)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 15, 2023
1 parent ba8cf9d commit ae04689
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ public static boolean canRetry(final Exception e) {
}

private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry,
final int retryCount,
final BulkResponse bulkResponse,
Exception e) throws InterruptedException {
final boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e);
final int retryCount,
final BulkResponse bulkResponse,
final Exception exceptionFromRequest) {
final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest);
if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() == null) {
Expand All @@ -241,8 +241,8 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
}
if (doRetry) {
if (retryCount % 5 == 0) {
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, e);
if (e == null) {
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest);
if (exceptionFromRequest == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() != null) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
Expand All @@ -253,7 +253,7 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
} else {
handleFailures(bulkRequestForRetry, bulkResponse, e);
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
}
return null;
}
Expand Down Expand Up @@ -335,7 +335,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
}
index++;
}
logFailure.accept(nonRetryableFailures.build(), null);
final ImmutableList<FailedBulkOperation> failedBulkOperations = nonRetryableFailures.build();
if(!failedBulkOperations.isEmpty()) {
logFailure.accept(failedBulkOperations, null);
}
return requestToReissue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import io.micrometer.core.instrument.Measurement;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -32,6 +31,7 @@
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
Expand All @@ -41,15 +41,18 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.VERSION_CONFLICT_EXCEPTION_TYPE;

Expand Down Expand Up @@ -236,15 +239,15 @@ public void testExecuteRetryable() throws Exception {
final ArgumentCaptor<List<FailedBulkOperation>> failedBulkOperationsCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<Throwable> throwableArgCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(logFailureConsumer).accept(failedBulkOperationsCaptor.capture(), throwableArgCaptor.capture());
MatcherAssert.assertThat(failedBulkOperationsCaptor.getValue(), notNullValue());
assertThat(failedBulkOperationsCaptor.getValue(), notNullValue());

final List<FailedBulkOperation> failedBulkOperations = failedBulkOperationsCaptor.getValue();
MatcherAssert.assertThat(failedBulkOperations.size(), equalTo(1));
assertThat(failedBulkOperations.size(), equalTo(1));

final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperations.get(0).getBulkOperation();
final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation();
MatcherAssert.assertThat(bulkOperation.index().index(), equalTo(testIndex));
MatcherAssert.assertThat(bulkOperation.index().id(), equalTo("2"));
assertThat(bulkOperation.index().index(), equalTo(testIndex));
assertThat(bulkOperation.index().id(), equalTo("2"));

// verify metrics
final List<Measurement> documentsSuccessFirstAttemptMeasurements = MetricsTestUtil.getMeasurementList(
Expand Down Expand Up @@ -296,13 +299,13 @@ public void testExecuteNonRetryableException() throws Exception {
verify(logFailureConsumer)
.accept(dlqObjectsArgCaptor.capture(), throwableArgCaptor.capture());
final List<FailedBulkOperation> failedBulkOperations = dlqObjectsArgCaptor.getValue();
MatcherAssert.assertThat(failedBulkOperations.size(), equalTo(4));
assertThat(failedBulkOperations.size(), equalTo(4));
AtomicInteger expectedIndexId = new AtomicInteger(1);
failedBulkOperations.forEach(failedBulkOperation -> {
final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperation.getBulkOperation();
final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation();
MatcherAssert.assertThat(bulkOperation.index().index(), equalTo(testIndex));
MatcherAssert.assertThat(bulkOperation.index().id(), equalTo(String.valueOf(expectedIndexId.get())));
assertThat(bulkOperation.index().index(), equalTo(testIndex));
assertThat(bulkOperation.index().id(), equalTo(String.valueOf(expectedIndexId.get())));
expectedIndexId.addAndGet(1);
});

Expand Down Expand Up @@ -350,7 +353,7 @@ public void testExecuteWithMaxRetries() throws Exception {
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4));
bulkRetryStrategy.execute(accumulatingBulkRequest);
MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true));
assertThat(maxRetriesLimitReached, equalTo(true));
assertEquals(numEventsSucceeded, 0);
assertEquals(numEventsFailed, 4);
}
Expand Down Expand Up @@ -380,7 +383,7 @@ public void testExecuteWithMaxRetriesWithException() throws Exception {
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation3).build(), eventHandle3));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4));
bulkRetryStrategy.execute(accumulatingBulkRequest);
MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true));
assertThat(maxRetriesLimitReached, equalTo(true));
assertEquals(numEventsSucceeded, 0);
assertEquals(numEventsFailed, 4);
}
Expand Down Expand Up @@ -412,7 +415,7 @@ public void testExecuteWithMaxRetriesAndSuccesses() throws Exception {
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation4).build(), eventHandle4));
accumulatingBulkRequest.addOperation(new BulkOperationWrapper(new BulkOperation.Builder().index(indexOperation5).build(), eventHandle5));
bulkRetryStrategy.execute(accumulatingBulkRequest);
MatcherAssert.assertThat(maxRetriesLimitReached, equalTo(true));
assertThat(maxRetriesLimitReached, equalTo(true));
assertEquals(numEventsSucceeded, 2);
assertEquals(numEventsFailed, 2);

Expand Down Expand Up @@ -456,15 +459,15 @@ public void testExecuteNonRetryableResponse() throws Exception {
verify(logFailureConsumer, times(1))
.accept(failedBulkOperationsCaptor.capture(), throwableArgCaptor.capture());
final List<FailedBulkOperation> failedBulkOperations = failedBulkOperationsCaptor.getValue();
MatcherAssert.assertThat(failedBulkOperations.size(), equalTo(3));
assertThat(failedBulkOperations.size(), equalTo(3));
AtomicInteger expectedIndexId = new AtomicInteger(1);

failedBulkOperations.forEach(failedBulkOperation -> {
expectedIndexId.addAndGet(1);
final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperation.getBulkOperation();
final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation();
MatcherAssert.assertThat(bulkOperation.index().index(), equalTo(testIndex));
MatcherAssert.assertThat(bulkOperation.index().id(), equalTo(String.valueOf(expectedIndexId.get())));
assertThat(bulkOperation.index().index(), equalTo(testIndex));
assertThat(bulkOperation.index().id(), equalTo(String.valueOf(expectedIndexId.get())));
});

// verify metrics
Expand All @@ -480,6 +483,75 @@ public void testExecuteNonRetryableResponse() throws Exception {
assertEquals(3.0, documentErrorsMeasurements.get(0).getValue(), 0);
}

@Test
void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status() throws Exception {
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = mock(Supplier.class);

final int maxRetries = 3;
final BulkRetryStrategy objectUnderTest = createObjectUnderTest(
requestFunction, logFailureConsumer, maxRetries,
bulkRequestSupplier);

final List<BulkOperationWrapper> operations = new ArrayList<>();
final List<BulkResponseItem> responseItems = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
operations.add(bulkOperationWrapper);

final BulkResponseItem responseItem = mock(BulkResponseItem.class);

when(responseItem.error()).thenReturn(mock(ErrorCause.class));
when(responseItem.status()).thenReturn(403);
responseItems.add(responseItem);
}
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest = mock(AccumulatingBulkRequest.class);
when(bulkRequest.getOperationsCount()).thenReturn(operations.size());
when(bulkRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0)));

final BulkResponse allFailingItemsResponse = mock(BulkResponse.class);
when(allFailingItemsResponse.errors()).thenReturn(true);
when(allFailingItemsResponse.items()).thenReturn(responseItems);


final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> reissueRequest = mock(AccumulatingBulkRequest.class);
when(reissueRequest.getOperationsCount()).thenReturn(operations.size());
when(reissueRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0)));
when(reissueRequest.getOperations()).thenReturn(operations);

when(requestFunction.apply(bulkRequest)).thenReturn(allFailingItemsResponse);
when(requestFunction.apply(reissueRequest)).thenReturn(allFailingItemsResponse);

when(bulkRequestSupplier.get()).thenReturn(reissueRequest);

objectUnderTest.execute(bulkRequest);

for (int i = 0; i < operations.size(); i++) {
final BulkOperationWrapper operation = operations.get(i);
verify(reissueRequest, times(maxRetries - 1)).addOperation(operation);
verify(reissueRequest, times(1)).getOperationAt(i);
}
verify(reissueRequest, times(maxRetries)).getOperationsCount();
verify(reissueRequest).getOperations();
verifyNoMoreInteractions(reissueRequest);

final ArgumentCaptor<List<FailedBulkOperation>> actualFailedOperationsCaptor = ArgumentCaptor.forClass(List.class);
verify(logFailureConsumer).accept(actualFailedOperationsCaptor.capture(), any());

final List<FailedBulkOperation> failedBulkOperations = actualFailedOperationsCaptor.getValue();
assertThat(failedBulkOperations.size(), equalTo(operations.size()));
for (int i = 0; i < operations.size(); i++) {
final FailedBulkOperation failedBulkOperation = failedBulkOperations.get(i);
final BulkOperationWrapper operation = operations.get(i);

assertThat(failedBulkOperation, notNullValue());
assertThat(failedBulkOperation.getBulkOperation(), equalTo(operation));
assertThat(failedBulkOperation.getFailure(), notNullValue());
}

verifyNoMoreInteractions(logFailureConsumer);
}

private static BulkResponseItem successItemResponse(final String index) {
return mock(BulkResponseItem.class);
Expand Down Expand Up @@ -607,7 +679,7 @@ private BulkResponse bulkMaxRetriesResponseWithSuccesses(final BulkRequest bulkR

private BulkResponse bulkMaxRetriesResponse(final BulkRequest bulkRequest) {
final int requestSize = bulkRequest.operations().size();
MatcherAssert.assertThat(requestSize, equalTo(4));
assertThat(requestSize, equalTo(4));
final List<BulkResponseItem> bulkItemResponses = Arrays.asList(
internalServerErrorItemResponse(index),
tooManyRequestItemResponse(index),
Expand Down

0 comments on commit ae04689

Please sign in to comment.