Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pausing the consumer doesn't work in 2.x versions as before #1376

Closed
arszen123 opened this issue May 27, 2022 · 1 comment · Fixed by #1382
Closed

Pausing the consumer doesn't work in 2.x versions as before #1376

arszen123 opened this issue May 27, 2022 · 1 comment · Fixed by #1382
Labels

Comments

@arszen123
Copy link
Contributor

arszen123 commented May 27, 2022

Describe the bug
Calling consumer.pause inside eachMessage or eachBatch and breaking the consumption flow, does not pause the consumption from a given topic-partition. The same message batch is received again, however, the consumption from that topic-partition is paused.

To Reproduce

  1. Create a new consumer instance.
  2. Connect and subscribe to a given topic.
  3. Create a handler function that pauses consumption from the received topic-partition, sets a timer for the consumption, and throws an error.
  4. Register a runner with the created handler function.
  5. The paused topic-partitions are ignored and messages are still received
  await consumer.connect();
  await consumer.subscribe({ topic });

  await consumer.run({ eachMessage: async ({ topic, partition, message }) => {
      try {
        doWork(message);
      } catch (e) {
        consumer.pause([{ topic, partitions: [partition] }])
        setTimeout(() => consumer.resume([{ topic, partitions: [partition] }]), 1000)

        throw e
      }
  }})

Expected behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside eachMessage, eachBatch), the consumer should not receive messages from that topic-partition, until it's resumed.

Observed behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside eachMessage, eachBatch), the consumer receives the same message batch even if the consumption is not resumed for that topic-partition.

Environment:

  • OS: [e.g. Windows 10]
  • KafkaJS version [e.g. 2.0.0] (It works with version 1.4.0)
  • NodeJS version [e.g. 14.16.0]

Additional context:
I did some research. As I can see in the fetch phase the topic-partitions are filtered from where the messages should be fetched, and then the processing phase begins.
Before 2.0.0 both phases were wrapped inside a retry strategy. After 2.0.0 only the message processing phase is wrapped inside a retry strategy, and because of that on a retry, the active partitions are not checked.

@Nevon
Copy link
Collaborator

Nevon commented Jun 28, 2022

This has been released as part of 2.1.0. Thank you for your contribution! ⭐

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants