From bc1764f9ef71dd31e8cd61c7571e493442bc6395 Mon Sep 17 00:00:00 2001 From: "Kim, Joo Hyuk" Date: Mon, 8 May 2023 20:57:20 +0900 Subject: [PATCH] [fix][client] Seek should be thread-safe (#20242) --- .../pulsar/client/impl/ConsumerImpl.java | 109 ++++++++++-------- .../pulsar/client/impl/ConsumerImplTest.java | 27 ++++- 2 files changed, 86 insertions(+), 50 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 199e8a9ae71b4..6c64fe0406954 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -208,6 +208,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -251,10 +252,12 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, } protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, - ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, - boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, MessageId startMessageId, - long startMessageRollbackDurationInSec, Schema schema, ConsumerInterceptors interceptors, - boolean createTopicIfDoesNotExist) { + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, + boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, + MessageId startMessageId, + long startMessageRollbackDurationInSec, Schema schema, + ConsumerInterceptors interceptors, + boolean createTopicIfDoesNotExist) { super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors); this.consumerId = client.newConsumerId(); @@ -319,21 +322,21 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat } this.connectionHandler = new ConnectionHandler(this, - new BackoffBuilder() - .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), - TimeUnit.NANOSECONDS) - .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) - .setMandatoryStop(0, TimeUnit.MILLISECONDS) - .create(), + new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(), this); this.topicName = TopicName.get(topic); if (this.topicName.isPersistent()) { this.acknowledgmentsGroupingTracker = - new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); + new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); } else { this.acknowledgmentsGroupingTracker = - NonPersistentAcknowledgmentGroupingTracker.of(); + NonPersistentAcknowledgmentGroupingTracker.of(); } if (conf.getDeadLetterPolicy() != null) { @@ -410,16 +413,16 @@ public CompletableFuture unsubscribeAsync() { log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage()); setState(State.Ready); unsubscribeFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to unsubscribe the subscription %s of topic %s", - topicName.toString(), subscription))); + PulsarClientException.wrap(e.getCause(), + String.format("Failed to unsubscribe the subscription %s of topic %s", + topicName.toString(), subscription))); return null; }); } else { unsubscribeFuture.completeExceptionally( - new PulsarClientException( - String.format("The client is not connected to the broker when unsubscribing the " - + "subscription %s of the topic %s", subscription, topicName.toString()))); + new PulsarClientException( + String.format("The client is not connected to the broker when unsubscribing the " + + "subscription %s of the topic %s", subscription, topicName.toString()))); } return unsubscribeFuture; } @@ -1400,7 +1403,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien } private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, - MessageIdData messageId, ClientCnx cnx) { + MessageIdData messageId, ClientCnx cnx) { // Lazy task scheduling to expire incomplete chunk message if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false, @@ -1445,7 +1448,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m increaseAvailablePermits(cnx); if (expireTimeOfIncompleteChunkedMessageMillis > 0 && System.currentTimeMillis() > (msgMetadata.getPublishTime() - + expireTimeOfIncompleteChunkedMessageMillis)) { + + expireTimeOfIncompleteChunkedMessageMillis)) { doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null); } else { trackMessage(msgId); @@ -1636,7 +1639,7 @@ protected void trackMessage(Message msg) { } protected void trackMessage(MessageId messageId) { - trackMessage(messageId, 0); + trackMessage(messageId, 0); } protected void trackMessage(MessageId messageId, int redeliveryCount) { @@ -1777,7 +1780,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo } private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx, boolean checkMaxMessageSize) { + ClientCnx currentCnx, boolean checkMaxMessageSize) { CompressionType compressionType = msgMetadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); @@ -1830,7 +1833,7 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC } private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, - ValidationError validationError) { + ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); discardMessage(messageId, currentCnx, validationError); @@ -2022,8 +2025,8 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) String originTopicNameStr = getOriginTopicNameStr(message); TypedMessageBuilder typedMessageBuilderNew = producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(message.getData()) - .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); + .value(message.getData()) + .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); if (message.hasKey()) { typedMessageBuilderNew.key(message.getKey()); } @@ -2052,7 +2055,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } result.complete(false); return null; - }); + }); } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); @@ -2151,9 +2154,15 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, final CompletableFuture seekFuture = new CompletableFuture<>(); ClientCnx cnx = cnx(); + if (!duringSeek.compareAndSet(false, true)) { + log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}", + topic, subscription, seekBy); + seekFuture.cancel(true); + return seekFuture; + } + MessageIdAdv originSeekMessageId = seekMessageId; seekMessageId = (MessageIdAdv) seekId; - duringSeek.set(true); log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); cnx.sendRequestWithId(seek, requestId).thenRun(() -> { @@ -2171,9 +2180,9 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); seekFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to seek the subscription %s of the topic %s to %s", - subscription, topicName.toString(), seekBy))); + PulsarClientException.wrap(e.getCause(), + String.format("Failed to seek the subscription %s of the topic %s to %s", + subscription, topicName.toString(), seekBy))); return null; }); return seekFuture; @@ -2185,7 +2194,7 @@ public CompletableFuture seekAsync(long timestamp) { return seekAsyncCheckState(seekBy).orElseGet(() -> { long requestId = client.newRequestId(); return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp), - MessageId.earliest, seekBy); + MessageId.earliest, seekBy); }); } @@ -2351,10 +2360,11 @@ public CompletableFuture> getLastMessageIdsAsync() { public CompletableFuture internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil - .failedFuture(new PulsarClientException.AlreadyClosedException( - String.format("The consumer %s was already closed when the subscription %s of the topic %s " - + "getting the last message id", consumerName, subscription, topicName.toString()))); - } + .failedFuture(new PulsarClientException.AlreadyClosedException( + String.format("The consumer %s was already closed when the subscription %s of the topic %s " + + "getting the last message id", consumerName, subscription, + topicName.toString()))); + } AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); Backoff backoff = new BackoffBuilder() @@ -2376,11 +2386,12 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, if (isConnected() && cnx != null) { if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) { future.completeExceptionally( - new PulsarClientException.NotSupportedException( - String.format("The command `GetLastMessageId` is not supported for the protocol version %d. " - + "The consumer is %s, topic %s, subscription %s", - cnx.getRemoteEndpointProtocolVersion(), - consumerName, topicName.toString(), subscription))); + new PulsarClientException.NotSupportedException( + String.format( + "The command `GetLastMessageId` is not supported for the protocol version %d. " + + "The consumer is %s, topic %s, subscription %s", + cnx.getRemoteEndpointProtocolVersion(), + consumerName, topicName.toString(), subscription))); return; } @@ -2399,31 +2410,31 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } if (log.isDebugEnabled()) { log.debug("[{}][{}] Successfully getLastMessageId {}:{}", - topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); } MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ? new MessageIdImpl(lastMessageId.getLedgerId(), - lastMessageId.getEntryId(), lastMessageId.getPartition()) + lastMessageId.getEntryId(), lastMessageId.getPartition()) : new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(), - lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + lastMessageId.getPartition(), lastMessageId.getBatchIndex()); future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("The subscription %s of the topic %s gets the last message id was failed", - subscription, topicName.toString()))); + PulsarClientException.wrap(e.getCause(), + String.format("The subscription %s of the topic %s gets the last message id was failed", + subscription, topicName.toString()))); return null; }); } else { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { future.completeExceptionally( - new PulsarClientException.TimeoutException( - String.format("The subscription %s of the topic %s could not get the last message id " - + "withing configured timeout", subscription, topicName.toString()))); + new PulsarClientException.TimeoutException( + String.format("The subscription %s of the topic %s could not get the last message id " + + "withing configured timeout", subscription, topicName.toString()))); return; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 29d180f5f9a16..ab11675f5434c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -26,7 +27,9 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; @@ -259,4 +262,26 @@ public void testTopicPriorityLevel() { assertThat(consumer.getPriorityLevel()).isEqualTo(1); } + + @Test(invocationTimeOut = 1000) + public void testSeekAsyncInternal() { + // given + ClientCnx cnx = mock(ClientCnx.class); + CompletableFuture clientReq = new CompletableFuture<>(); + when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq); + + consumer.setClientCnx(cnx); + consumer.setState(HandlerState.State.Ready); + + // when + CompletableFuture firstResult = consumer.seekAsync(1L); + CompletableFuture secondResult = consumer.seekAsync(1L); + + clientReq.complete(null); + + // then + assertTrue(firstResult.isDone()); + assertTrue(secondResult.isCancelled()); + verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong()); + } }