Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-?] Optimization of massive delayed message scenarios #15500

Open
JunFu0814 opened this issue May 9, 2022 · 4 comments
Open

[PIP-?] Optimization of massive delayed message scenarios #15500

JunFu0814 opened this issue May 9, 2022 · 4 comments

Comments

@JunFu0814
Copy link
Contributor

JunFu0814 commented May 9, 2022

Motivation

Business scene

Delayed messages are a common scenario in the Message business system. For example, group-opening reminders in group buying activities, automatic deductions when continuous monthly subscriptions expire, and coupon expiration reminders can all be implemented with delayed messages.

Current Mode

Currently Pulsar implements arbitrarily delayed messages based on in-memory time rounds. The Producer side can send a mixture of ordinary messages and delayed messages to topics and persist them to Bookkeeper. Now delayed messages can only support shard mode subscriptions,each subscription group will identify delayed messages and add them to the time wheel. The default time granularity of the time wheel is 1 second. The time wheel stores the delayed timestamp, ledgerId and EntryId related index information. When the time arrives, the message will be read and sent to the Consumer according to the ledgerId and EntryId.

Current Problems

  1. The size of the memory is limited. A message needs 24B to store 3 long fields. Assuming that on a Broker, 10 million delayed messages require 228M of off-heap memory.
  2. The deletion cycle of Ledger is longer. When delayed messages and ordinary messages are stored in the same Ledger, if they are stored in a delayed message with a large time span, it will affect the deletion of the Ledger, because the messages that are not consumed will not be cleared, and the memory will be occupied for a long time, such as after 1 month. It's time to spend.
  3. Delayed message recovery scenarios are complicated. If a topic has a large number of delayed messages, when the topic is transferred or the broker goes down, a large number of delayed messages need to be re-indexed.

Goat

Optimization ideas

Reduce the delay message magnitude for the current build time round. We can use the delay class to divide the overall delayed messages into time ranges, so that we only need to care about the most recent delayed messages. For example, a message with a relatively large time span is stored in a common topic (delay level topic), and then a certain mechanism is used to ensure that the message that is about to expire is placed in the time wheel over time, so that to a certain extent, the message can be greatly improved. Reduce the magnitude of delayed messages on the time wheel, thereby mitigating the impact of the problems described above.

image

Implementation details

  1. When the Producer sends a message, it will calculate which partition of the delay level topic the message needs to be sent to based on the delay time, where the delay level topic is an internal partition topic. For example, if the delay span of a business is 1 year, you can create a 364-day partition Topic. Partion-0 stores delayed messages with time >= 1 day and < 2 days, and partion-363 stores time >= 364 days and < Delayed messages for 365 days, if the delay time is less than 1 day, will be sent directly to the real business topic, and the time wheel will run normally. For the partition division rule of the delay level topic, we can determine it according to the policy and the maximum delay time passed in from the Producer.
  2. A set of timing tasks are required to synchronize the messages in the delay level topic to the business topic. The synchronization process can be understood as a complete consumption and production process, and there will be message persistence in this process. Usually, this capability can be maintained by the Broker. In order to prevent the Broker from being overloaded, the ability of Functions can be used to achieve delayed message synchronization.

Questions

  1. The delay level topic is automatically created by the client, and the data retention mechanism needs to be considered.
  2. Delay level Topic life cycle management. When the main service topic is deleted, does the delay level topic need to be deleted synchronously? How to deal with delayed messages that have not been delivered at this time?
  3. The user specifies an unreasonable policy and maximum delay time on the client side, which may lead to the existence of too many partitions in the delay level topic.
  4. Is there a scenario of dynamically expanding the delay level of the Topic's partition?

Proposed Changes

Client API

  • add fields #delayLevelTopicEnabled
    #delayLevelTopicName #partitionStrategy in ProducerConfigurationData.java
    //for delay level topic
    private boolean delayLevelTopicEnabled = false;

    private String delayLevelTopicName = null;

    private DelayLevelTopicPartitionStrategy partitionStrategy = new DelayLevelTopicPartitionStrategy();
  • add class DelayLevelTopicPartitionStrategy.java
public class DelayLevelTopicPartitionStrategy {

    private long messageMaximumDelayTimeSeconds = 60 * 60 * 24 * 30;

    private PartitionBy partitionBy = PartitionBy.DAY;

    public enum PartitionBy {
        YEAR,MONTH,DAY,HOUR
    }
}
  • add methods #enableDelayLevelTopic(boolean delayLevelTopicEnabled) #delayLevelTopic(String delayLevelTopicName) #delayLevelTopicPartitionStrategy(DelayLevelTopicPartitionStrategy strategy) in ProducerBuilder.java
    /**
     * This config determines whether to delay the level of topic when delaying message delivery.
     * Turn on this option to get better performance in scenarios with a large number of delayed messages and long delays.
     * Not turned on by default.
     *
     * @param delayLevelTopicEnabled
     * @return the producer builder instance
     */
    ProducerBuilder<T> enableDelayLevelTopic(boolean delayLevelTopicEnabled);

    /**
     * The name of the delay level topic can be specified.
     * If not specified, it will be generated based on the current topic name.
     *
     * @param delayLevelTopicName the name of the delay level topic
     * @return the producer builder instance
     */
    ProducerBuilder<T> delayLevelTopic(String delayLevelTopicName);

    /**
     * Partition strategy for delay level topic.
     * We will decide how many partitions this topic will be divided into according to the
     * messageMaximumDelayTimeSeconds and partitionBy.
     *
     * @param strategy the strategy of delay level topic partitioned by
     * @return the producer builder instance
     */
    ProducerBuilder<T> delayLevelTopicPartitionStrategy(DelayLevelTopicPartitionStrategy strategy);
  • Specify these configurations when creating the Producer
    ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
    builder.enableDelayLevelTopic(true).
            delayLevelTopic("persistent://public/default/delay-level-topic-test").
            delayLevelTopicPartitionStrategy(new DelayLevelTopicPartitionStrategy(60 * 60 * 24 * 30, DelayLevelTopicPartitionSt
rategy.PartitionBy.DAY));

Broker

TODO

Functions

TODO : Timing synchronization level delay message in topic to business topic

@JunFu0814 JunFu0814 changed the title Optimization of massive delayed message scenarios [PIP] Optimization of massive delayed message scenarios May 9, 2022
@JunFu0814 JunFu0814 changed the title [PIP] Optimization of massive delayed message scenarios [PIP-156] Optimization of massive delayed message scenarios May 9, 2022
@hpvd
Copy link

hpvd commented May 9, 2022

there is already another PIP with this PIP-number:
PIP-156: Build and Run Pulsar Server on Java 17
#15207

please see https://github.com/apache/pulsar/wiki/Pulsar-Improvement-Proposal-%28PIP%29

@gaozhangmin gaozhangmin changed the title [PIP-156] Optimization of massive delayed message scenarios [PIP-157] Optimization of massive delayed message scenarios May 10, 2022
@hpvd
Copy link

hpvd commented May 10, 2022

sorry for bothering again, just upcounting 1 doesn't do the trick: There is already PIP 157, see PIP-157: Bucketing topic metadata to allow more topics per namespace #15254
@lhotari could you help?

@lhotari lhotari changed the title [PIP-157] Optimization of massive delayed message scenarios [PIP-?] Optimization of massive delayed message scenarios May 20, 2022
@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jun 20, 2022
@netudima
Copy link

it looks like addressed by #16763

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants