You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
Calling consumer.pause inside eachMessage or eachBatch and breaking the consumption flow, does not pause the consumption from a given topic-partition. The same message batch is received again, however, the consumption from that topic-partition is paused.
To Reproduce
Create a new consumer instance.
Connect and subscribe to a given topic.
Create a handler function that pauses consumption from the received topic-partition, sets a timer for the consumption, and throws an error.
Register a runner with the created handler function.
The paused topic-partitions are ignored and messages are still received
Expected behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside eachMessage, eachBatch), the consumer should not receive messages from that topic-partition, until it's resumed.
Observed behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside eachMessage, eachBatch), the consumer receives the same message batch even if the consumption is not resumed for that topic-partition.
Environment:
OS: [e.g. Windows 10]
KafkaJS version [e.g. 2.0.0] (It works with version 1.4.0)
NodeJS version [e.g. 14.16.0]
Additional context:
I did some research. As I can see in the fetch phase the topic-partitions are filtered from where the messages should be fetched, and then the processing phase begins.
Before 2.0.0 both phases were wrapped inside a retry strategy. After 2.0.0 only the message processing phase is wrapped inside a retry strategy, and because of that on a retry, the active partitions are not checked.
The text was updated successfully, but these errors were encountered:
Describe the bug
Calling
consumer.pause
insideeachMessage
oreachBatch
and breaking the consumption flow, does not pause the consumption from a given topic-partition. The same message batch is received again, however, the consumption from that topic-partition is paused.To Reproduce
Expected behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside
eachMessage
,eachBatch
), the consumer should not receive messages from that topic-partition, until it's resumed.Observed behavior
When pausing consumption from a particular topic-partition and breaking the consumer (throwing an error inside
eachMessage
,eachBatch
), the consumer receives the same message batch even if the consumption is not resumed for that topic-partition.Environment:
Additional context:
I did some research. As I can see in the fetch phase the topic-partitions are filtered from where the messages should be fetched, and then the processing phase begins.
Before 2.0.0 both phases were wrapped inside a retry strategy. After 2.0.0 only the message processing phase is wrapped inside a retry strategy, and because of that on a retry, the active partitions are not checked.
The text was updated successfully, but these errors were encountered: