Skip to content

Commit

Permalink
[fix][client] Seek should be thread-safe (#20242)
Browse files Browse the repository at this point in the history
  • Loading branch information
JooHyukKim authored May 8, 2023
1 parent 3c80af2 commit bc1764f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -251,10 +252,12 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
Expand Down Expand Up @@ -319,21 +322,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
}

this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
NonPersistentAcknowledgmentGroupingTracker.of();
NonPersistentAcknowledgmentGroupingTracker.of();
}

if (conf.getDeadLetterPolicy() != null) {
Expand Down Expand Up @@ -410,16 +413,16 @@ public CompletableFuture<Void> unsubscribeAsync() {
log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
}
return unsubscribeFuture;
}
Expand Down Expand Up @@ -1400,7 +1403,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
}

private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
MessageIdData messageId, ClientCnx cnx) {

// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false,
Expand Down Expand Up @@ -1445,7 +1448,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() > (msgMetadata.getPublishTime()
+ expireTimeOfIncompleteChunkedMessageMillis)) {
+ expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
} else {
trackMessage(msgId);
Expand Down Expand Up @@ -1636,7 +1639,7 @@ protected void trackMessage(Message<?> msg) {
}

protected void trackMessage(MessageId messageId) {
trackMessage(messageId, 0);
trackMessage(messageId, 0);
}

protected void trackMessage(MessageId messageId, int redeliveryCount) {
Expand Down Expand Up @@ -1777,7 +1780,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo
}

private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
ClientCnx currentCnx, boolean checkMaxMessageSize) {
ClientCnx currentCnx, boolean checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
Expand Down Expand Up @@ -1830,7 +1833,7 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC
}

private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
discardMessage(messageId, currentCnx, validationError);
Expand Down Expand Up @@ -2022,8 +2025,8 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
Expand Down Expand Up @@ -2052,7 +2055,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
}
result.complete(false);
return null;
});
});
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex);
Expand Down Expand Up @@ -2151,9 +2154,15 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();

if (!duringSeek.compareAndSet(false, true)) {
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.cancel(true);
return seekFuture;
}

MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
Expand All @@ -2171,9 +2180,9 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
Expand All @@ -2185,7 +2194,7 @@ public CompletableFuture<Void> seekAsync(long timestamp) {
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
MessageId.earliest, seekBy);
});
}

Expand Down Expand Up @@ -2351,10 +2360,11 @@ public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s "
+ "getting the last message id", consumerName, subscription, topicName.toString())));
}
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s "
+ "getting the last message id", consumerName, subscription,
topicName.toString())));
}

AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
Expand All @@ -2376,11 +2386,12 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
if (isConnected() && cnx != null) {
if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
future.completeExceptionally(
new PulsarClientException.NotSupportedException(
String.format("The command `GetLastMessageId` is not supported for the protocol version %d. "
+ "The consumer is %s, topic %s, subscription %s",
cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
new PulsarClientException.NotSupportedException(
String.format(
"The command `GetLastMessageId` is not supported for the protocol version %d. "
+ "The consumer is %s, topic %s, subscription %s",
cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
return;
}

Expand All @@ -2399,31 +2410,31 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());
}

MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
? new MessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition())
lastMessageId.getEntryId(), lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(),
lastMessageId.getPartition(), lastMessageId.getBatchIndex());
lastMessageId.getPartition(), lastMessageId.getBatchIndex());

future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id "
+ "withing configured timeout", subscription, topicName.toString())));
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id "
+ "withing configured timeout", subscription, topicName.toString())));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -259,4 +262,26 @@ public void testTopicPriorityLevel() {

assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}

@Test(invocationTimeOut = 1000)
public void testSeekAsyncInternal() {
// given
ClientCnx cnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> clientReq = new CompletableFuture<>();
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq);

consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);

// when
CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
CompletableFuture<Void> secondResult = consumer.seekAsync(1L);

clientReq.complete(null);

// then
assertTrue(firstResult.isDone());
assertTrue(secondResult.isCancelled());
verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
}
}

0 comments on commit bc1764f

Please sign in to comment.