Skip to content

Commit

Permalink
Merge pull request #1382 from arszen123/fix/consumer-pause-resume-fun…
Browse files Browse the repository at this point in the history
…ctionality

Fix consumer pause resume functionality
  • Loading branch information
Nevon authored Jun 28, 2022
2 parents ddf4f64 + 214427f commit 51a4947
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 46 deletions.
133 changes: 133 additions & 0 deletions src/consumer/__tests__/pause.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,139 @@ describe('Consumer', () => {
})
})

describe('when pausing and breaking the consumption', () => {
it('does not process messages when consumption from topic is paused', async () => {
const [topic] = topics
const key1 = secureRandom()
const message1 = { key: `key-${key1}`, value: `value-${key1}`, partition: 0 }
const messagesConsumed = []
let shouldThrow = true

await consumer.connect()
await producer.connect()

await producer.send({ acks: 1, topic, messages: [message1] })
await consumer.subscribe({ topic, fromBeginning: true })

consumer.run({
eachMessage: async event => {
messagesConsumed.push(event)
if (shouldThrow) {
consumer.pause([{ topic }])
throw new Error('Should fail')
}
},
})

await waitForConsumerToJoinGroup(consumer)

const consumedMessagesTillError = [
...(await waitForMessages(messagesConsumed, { delay: 1000 })),
]

shouldThrow = false
consumer.resume([{ topic }])

const consumedMessages = await waitForMessages(messagesConsumed, { number: 2 })

expect(consumedMessagesTillError).toHaveLength(1)
expect(consumedMessagesTillError).toEqual([
expect.objectContaining({
topic,
partition: expect.any(Number),
message: expect.objectContaining({ offset: '0' }),
}),
])
expect(consumedMessages).toHaveLength(2)
expect(consumedMessages).toEqual([
expect.objectContaining({
topic,
partition: expect.any(Number),
message: expect.objectContaining({ offset: '0' }),
}),
expect.objectContaining({
topic,
partition: expect.any(Number),
message: expect.objectContaining({ offset: '0' }),
}),
])
})

it('does not process messages when consumption from topic-partition is paused', async () => {
const [topic] = topics
const pausedPartition = 0
const key1 = secureRandom()
const message1 = { key: `key-${key1}`, value: `value-${key1}`, partition: 0 }
const key2 = secureRandom()
const message2 = { key: `key-${key2}`, value: `value-${key2}`, partition: 1 }
const messagesConsumed = []
let shouldThrow = true

await consumer.connect()
await producer.connect()

await producer.send({ acks: 1, topic, messages: [message1, message2] })
await consumer.subscribe({ topic, fromBeginning: true })

consumer.run({
eachMessage: async event => {
messagesConsumed.push(event)
if (shouldThrow && event.partition === pausedPartition) {
consumer.pause([{ topic, partitions: [pausedPartition] }])
throw new Error('Should fail')
}
},
})

await waitForConsumerToJoinGroup(consumer)

const consumedMessagesTillError = [
...(await waitForMessages(messagesConsumed, { number: 2 })),
]

shouldThrow = false
consumer.resume([{ topic, partitions: [pausedPartition] }])

const consumedMessages = await waitForMessages(messagesConsumed, { number: 3 })

expect(consumedMessagesTillError).toHaveLength(2)
expect(consumedMessagesTillError).toEqual(
expect.arrayContaining([
expect.objectContaining({
topic,
partition: 0,
message: expect.objectContaining({ offset: '0' }),
}),
expect.objectContaining({
topic,
partition: 1,
message: expect.objectContaining({ offset: '0' }),
}),
])
)
expect(consumedMessages).toHaveLength(3)
expect(consumedMessages).toEqual(
expect.arrayContaining([
expect.objectContaining({
topic,
partition: 0,
message: expect.objectContaining({ offset: '0' }),
}),
expect.objectContaining({
topic,
partition: 0,
message: expect.objectContaining({ offset: '0' }),
}),
expect.objectContaining({
topic,
partition: 1,
message: expect.objectContaining({ offset: '0' }),
}),
])
)
})
})

describe('when all topics are paused', () => {
it('does not fetch messages and wait maxWaitTimeInMs per attempt', async () => {
const consumerCluster = createCluster()
Expand Down
7 changes: 4 additions & 3 deletions src/consumer/__tests__/runner.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,10 @@ describe('Consumer > Runner', () => {
messages: [{ offset: 4, key: '1', value: '2' }],
})

const longRunningRequest = new Promise(resolve => {
setTimeout(() => resolve([]), 100)
})
const longRunningRequest = () =>
new Promise(resolve => {
setTimeout(() => resolve([]), 100)
})

const error = new Error('Error while processing heartbeats in parallel')
consumerGroup.heartbeat = async () => {
Expand Down
90 changes: 47 additions & 43 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,25 @@ module.exports = class Runner extends EventEmitter {
this.scheduleFetchManager()
}

async scheduleFetchManager() {
scheduleFetchManager() {
if (!this.running) {
this.consuming = false

this.logger.info('consumer not running, exiting', {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
})

return
}

this.consuming = true

while (this.running) {
this.retrier(async (bail, retryCount, retryTime) => {
if (!this.running) {
return
}

try {
await this.fetchManager.start()
} catch (e) {
Expand All @@ -110,7 +125,7 @@ module.exports = class Runner extends EventEmitter {
})

await this.consumerGroup.joinAndSync()
continue
return
}

if (e.type === 'UNKNOWN_MEMBER_ID') {
Expand All @@ -122,16 +137,37 @@ module.exports = class Runner extends EventEmitter {

this.consumerGroup.memberId = null
await this.consumerGroup.joinAndSync()
continue
return
}

this.onCrash(e)
break
}
}
if (e.name === 'KafkaJSNotImplemented') {
return bail(e)
}

this.consuming = false
this.running = false
if (e.name === 'KafkaJSConnectionError') {
return bail(e)
}

this.logger.debug('Error while scheduling fetch manager, trying again...', {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
error: e.message,
stack: e.stack,
retryCount,
retryTime,
})

throw e
}
})
.then(() => {
this.scheduleFetchManager()
})
.catch(e => {
this.onCrash(e)
this.consuming = false
this.running = false
})
}

async stop() {
Expand Down Expand Up @@ -404,39 +440,7 @@ module.exports = class Runner extends EventEmitter {
await this.heartbeat()
}

return this.retrier(async (bail, retryCount, retryTime) => {
try {
await onBatch(batch)
} catch (e) {
if (!this.running) {
this.logger.debug('consumer not running, exiting', {
error: e.message,
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
})
return
}

if (
isRebalancing(e) ||
e.type === 'UNKNOWN_MEMBER_ID' ||
e.name === 'KafkaJSNotImplemented'
) {
return bail(e)
}

this.logger.debug('Error while fetching data, trying again...', {
groupId: this.consumerGroup.groupId,
memberId: this.consumerGroup.memberId,
error: e.message,
stack: e.stack,
retryCount,
retryTime,
})

throw e
}
})
await onBatch(batch)
}

autoCommitOffsets() {
Expand Down

0 comments on commit 51a4947

Please sign in to comment.