Skip to content

Commit

Permalink
[PIP-130] Apply redelivery backoff policy for ack timeout (#13707)
Browse files Browse the repository at this point in the history
### Motivation

This pull request is the specific implementation plan of PIP-130

### Modifications

*Apply the message redelivery policy for the ack timeout.*
*Alert NegativeAckBackoff  interface to  RedeliveryBackoff.*
*Expose AckTimeoutRedeliveryBackoff in ConsumerBulider.*
  • Loading branch information
liudezhi2098 authored Jan 27, 2022
1 parent 5bb4b75 commit 74b5d50
Show file tree
Hide file tree
Showing 21 changed files with 946 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions,
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(subscriptionType)
.negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
.minNackTimeMs(minNackTimeMs)
.maxNackTimeMs(maxNackTimeMs)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(minNackTimeMs)
.maxDelayMs(maxNackTimeMs)
.build())
.ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
.subscribe();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -376,20 +376,36 @@ public SubscriptionInitialPosition getSubscriptionInitialPosition(){
}

/**
* @return the configured {@link NegativeAckRedeliveryBackoff} for the consumer
* @return the configured {@link RedeliveryBackoff} for the consumer
*/
public NegativeAckRedeliveryBackoff getNegativeAckRedeliveryBackoff() {
public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
return conf.getNegativeAckRedeliveryBackoff();
}

/**
* @param negativeAckRedeliveryBackoff the negative ack redelivery backoff policy.
* Default value is: NegativeAckRedeliveryExponentialBackoff
* Default value is: MultiplierRedeliveryBackoff
* @return the {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setNegativeAckRedeliveryBackoff(
NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff) {
public ConsumerConfiguration setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) {
conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
return this;
}

/**
* @return the configured {@link RedeliveryBackoff} for the consumer
*/
public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
return conf.getAckTimeoutRedeliveryBackoff();
}

/**
* @param ackTimeoutRedeliveryBackoff redelivery backoff policy for ack timeout.
* Default value is: MultiplierRedeliveryBackoff
* @return the {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -766,13 +766,28 @@ public interface ConsumerBuilder<T> extends Cloneable {
*
* <p>Example:
* <pre>
* client.newConsumer().negativeAckRedeliveryBackoff(NegativeAckRedeliveryExponentialBackoff.builder()
* client.newConsumer().negativeAckRedeliveryBackoff(ExponentialRedeliveryBackoff.builder()
* .minNackTimeMs(1000)
* .maxNackTimeMs(60 * 1000)
* .build()).subscribe();
* </pre>
*/
ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff);
ConsumerBuilder<T> negativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff);

/**
* Notice: the redeliveryBackoff will not work with `consumer.negativeAcknowledge(MessageId messageId)`
* because we are not able to get the redelivery count from the message ID.
*
* <p>Example:
* <pre>
* client.newConsumer().ackTimeout(10, TimeUnit.SECOND)
* .ackTimeoutRedeliveryBackoff(ExponentialRedeliveryBackoff.builder()
* .minNackTimeMs(1000)
* .maxNackTimeMs(60 * 1000)
* .build()).subscribe();
* </pre>
*/
ConsumerBuilder<T> ackTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff);

/**
* Start the consumer in a paused state. When enabled, the consumer does not immediately fetch messages when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* Interface for custom message is negativeAcked policy, users can specify a {@link NegativeAckRedeliveryBackoff} for
* Interface for custom message is negativeAcked policy, users can specify a {@link RedeliveryBackoff} for
* a consumer.
*
* Notice: the consumer crashes will trigger the redelivery of the unacked message, this case will not respect the
* {@link NegativeAckRedeliveryBackoff}, which means the message might get redelivered earlier than the delay time
* {@link RedeliveryBackoff}, which means the message might get redelivered earlier than the delay time
* from the backoff.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface NegativeAckRedeliveryBackoff extends Serializable {
public interface RedeliveryBackoff extends Serializable {
/**
* @param redeliveryCount indicates the number of times the message was redelivered
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand Down Expand Up @@ -502,12 +502,19 @@ public ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloa
}

@Override
public ConsumerBuilder<T> negativeAckRedeliveryBackoff(NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff) {
public ConsumerBuilder<T> negativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) {
checkArgument(negativeAckRedeliveryBackoff != null, "negativeAckRedeliveryBackoff must not be null.");
conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
return this;
}

@Override
public ConsumerBuilder<T> ackTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
checkArgument(ackTimeoutRedeliveryBackoff != null, "ackTimeoutRedeliveryBackoff must not be null.");
conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
return this;
}

@Override
public ConsumerBuilder<T> startPaused(boolean paused) {
conf.setStartPaused(paused);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
duringSeek = new AtomicBoolean(false);

if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
Math.min(conf.getTickDurationMillis(), conf.getAckTimeoutMillis()));
if (conf.getAckTimeoutRedeliveryBackoff() != null) {
this.unAckedMessageTracker = new UnAckedMessageRedeliveryTracker(client, this, conf);
} else {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf);
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
Expand Down Expand Up @@ -1202,7 +1201,8 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
return;
}

ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, headersAndPayload, cnx);
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
cnx);

boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);

Expand Down Expand Up @@ -1510,11 +1510,15 @@ protected synchronized void messageProcessed(Message<?> msg) {

protected void trackMessage(Message<?> msg) {
if (msg != null) {
trackMessage(msg.getMessageId());
trackMessage(msg.getMessageId(), msg.getRedeliveryCount());
}
}

protected void trackMessage(MessageId messageId) {
trackMessage(messageId, 0);
}

protected void trackMessage(MessageId messageId, int redeliveryCount) {
if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
MessageIdImpl id = (MessageIdImpl) messageId;
if (id instanceof BatchMessageIdImpl) {
Expand All @@ -1526,7 +1530,7 @@ protected void trackMessage(MessageId messageId) {
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
unAckedMessageTracker.add(id, redeliveryCount);
}
}
}
Expand Down Expand Up @@ -1570,8 +1574,8 @@ public long getLastDisconnectedTimestamp() {
return connectionHandler.lastConnectionClosedTimestamp;
}

private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
ClientCnx currentCnx) {
private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx currentCnx) {

if (msgMetadata.getEncryptionKeysCount() == 0) {
return payload.retain();
Expand All @@ -1597,7 +1601,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, MessageMetadata
"[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not"
+ " implemented to consume encrypted message",
topic, subscription, consumerName, m);
unAckedMessageTracker.add(m);
unAckedMessageTracker.add(m, redeliveryCount);
return null;
}
}
Expand Down Expand Up @@ -1630,7 +1634,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, MessageMetadata
log.error(
"[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message",
topic, subscription, consumerName, m);
unAckedMessageTracker.add(m);
unAckedMessageTracker.add(m, redeliveryCount);
return null;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.paused = conf.isStartPaused();

if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(
client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
if (conf.getAckTimeoutRedeliveryBackoff() != null) {
this.unAckedMessageTracker = new UnAckedTopicMessageRedeliveryTracker(client, this, conf);
} else {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(
client, this, conf.getAckTimeoutMillis());
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf);
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
Expand Down Expand Up @@ -304,7 +302,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId());
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
completePendingReceive(receivedFuture, topicMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
Expand All @@ -317,7 +315,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {

@Override
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
unAckedMessageTracker.add(msg.getMessageId(), msg.getRedeliveryCount());
decreaseIncomingMessageSize(msg);
}

Expand All @@ -343,7 +341,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
message = incomingMessages.take();
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (Exception e) {
Expand All @@ -359,7 +357,7 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
if (message != null) {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
Expand Down Expand Up @@ -427,7 +425,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import org.apache.pulsar.client.api.RedeliveryBackoff;

/**
* MultiplierRedeliveryBackoff.
*/
public class MultiplierRedeliveryBackoff implements RedeliveryBackoff {

private final long minDelayMs;
private final long maxDelayMs;
private final double multiplier;
private final int maxMultiplierPow;

private MultiplierRedeliveryBackoff(long minDelayMs, long maxDelayMs, double multiplier) {
this.minDelayMs = minDelayMs;
this.maxDelayMs = maxDelayMs;
this.multiplier = multiplier;
maxMultiplierPow = (int) (Math.log((double) maxDelayMs / minDelayMs) / Math.log(multiplier)) + 1;
}

public static MultiplierRedeliveryBackoff.MultiplierRedeliveryBackoffBuilder builder() {
return new MultiplierRedeliveryBackoff.MultiplierRedeliveryBackoffBuilder();
}

public long getMinDelayMs() {
return this.minDelayMs;
}

public long getMaxDelayMs() {
return this.maxDelayMs;
}

@Override
public long next(int redeliveryCount) {
if (redeliveryCount <= 0 || minDelayMs <= 0) {
return this.minDelayMs;
}
if (redeliveryCount > maxMultiplierPow) {
return this.maxDelayMs;
}
return Math.min((long) (minDelayMs * Math.pow(multiplier, redeliveryCount)), this.maxDelayMs);
}

/**
* Builder of MultiplierRedeliveryBackoff.
*/
public static class MultiplierRedeliveryBackoffBuilder {
private long minDelayMs = 1000 * 10;
private long maxDelayMs = 1000 * 60 * 10;
private double multiplier = 2.0;

public MultiplierRedeliveryBackoffBuilder minDelayMs(long minDelayMs) {
this.minDelayMs = minDelayMs;
return this;
}

public MultiplierRedeliveryBackoffBuilder maxDelayMs(long maxDelayMs) {
this.maxDelayMs = maxDelayMs;
return this;
}

public MultiplierRedeliveryBackoffBuilder multiplier(double multiplier) {
this.multiplier = multiplier;
return this;
}

public MultiplierRedeliveryBackoff build() {
checkArgument(minDelayMs >= 0, "min delay time must be >= 0");
checkArgument(maxDelayMs >= minDelayMs, "maxDelayMs must be >= minDelayMs");
checkArgument(multiplier > 1, "multiplier must be > 1");
return new MultiplierRedeliveryBackoff(minDelayMs, maxDelayMs, multiplier);
}
}
}
Loading

0 comments on commit 74b5d50

Please sign in to comment.