Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] deadlock in kop #181

Closed
aloyszhang opened this issue Sep 22, 2020 · 4 comments · Fixed by #182
Closed

[BUG] deadlock in kop #181

aloyszhang opened this issue Sep 22, 2020 · 4 comments · Fixed by #182

Comments

@aloyszhang
Copy link
Contributor

Describe the bug
Kop has thread deadlock probelm.

Found one Java-level deadlock:

=============================
"pulsar-io-23-48":
  waiting for ownable synchronizer 0x0000000606643fc0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "pulsar-io-23-14"
"pulsar-io-23-14":
  waiting to lock monitor 0x00007f9bd2a71f28 (object 0x0000000606cd77a8, a io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList),
  which is held by "ExpirationReaper-group-coordinator-delayed-join"
"ExpirationReaper-group-coordinator-delayed-join":
  waiting to lock monitor 0x00007f9bc861b478 (object 0x000000067c9f4240, a io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat),
  which is held by "pulsar-io-23-14"

Expected behavior
Deadlock should be not happen

Additional context
jstack informations are following

"pulsar-io-23-48":
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000606643fc0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
	at io.streamnative.pulsar.handlers.kop.utils.CoreUtils.inLock(CoreUtils.java:33)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.inLock(GroupMetadata.java:210)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.java:837)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$null$35(GroupCoordinator.java:780)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1585/1479658866.apply(Unknown Source)
	at java.util.Optional.map(Optional.java:215)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$handleCommitOffsets$38(GroupCoordinator.java:779)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1584/792918900.get(Unknown Source)
	at java.util.Optional.orElseGet(Optional.java:267)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.java:777)
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleOffsetCommitRequest(KafkaRequestHandler.java:959)
	at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
"pulsar-io-23-14":
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList.remove(TimerTaskList.java:106)
	- waiting to lock <0x0000000606cd77a8> (a io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList)
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList$TimerTaskEntry.remove(TimerTaskList.java:184)
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTask.cancel(TimerTask.java:32)
	- locked <0x000000067c9f4240> (a io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation.forceComplete(DelayedOperation.java:66)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat.lambda$tryComplete$0(DelayedHeartbeat.java:55)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat$$Lambda$1389/300546558.get(Unknown Source)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$tryCompleteHeartbeat$66(GroupCoordinator.java:1281)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1390/845845689.get(Unknown Source)
	at io.streamnative.pulsar.handlers.kop.utils.CoreUtils.inLock(CoreUtils.java:35)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.inLock(GroupMetadata.java:210)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.java:1278)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.java:55)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation.maybeTryComplete(DelayedOperation.java:120)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:352)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:229)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.java:1072)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$doCommitOffsets$45(GroupCoordinator.java:861)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1588/2144613179.get(Unknown Source)
	at io.streamnative.pulsar.handlers.kop.utils.CoreUtils.inLock(CoreUtils.java:35)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.inLock(GroupMetadata.java:210)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.java:837)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$null$35(GroupCoordinator.java:780)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1585/1479658866.apply(Unknown Source)
	at java.util.Optional.map(Optional.java:215)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.lambda$handleCommitOffsets$38(GroupCoordinator.java:779)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator$$Lambda$1584/792918900.get(Unknown Source)
	at java.util.Optional.orElseGet(Optional.java:267)
	at io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.java:777)
	at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleOffsetCommitRequest(KafkaRequestHandler.java:959)
	at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:250)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
"ExpirationReaper-group-coordinator-delayed-join":
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTask.getTimerTaskEntry(TimerTask.java:47)
	- waiting to lock <0x000000067c9f4240> (a io.streamnative.pulsar.handlers.kop.coordinator.group.DelayedHeartbeat)
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList$TimerTaskEntry.cancelled(TimerTaskList.java:174)
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimingWheel.add(TimingWheel.java:161)
	at io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer.addTimerTaskEntry(SystemTimer.java:142)
	at io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer.lambda$new$0(SystemTimer.java:126)
	at io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer$$Lambda$446/18133507.accept(Unknown Source)
	at io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList.flush(TimerTaskList.java:125)
	- locked <0x0000000606cd77a8> (a io.streamnative.pulsar.handlers.kop.utils.timer.TimerTaskList)
	at io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer.advanceClock(SystemTimer.java:159)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory.advanceClock(DelayedOperationPurgatory.java:399)
	at io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory$1.doWork(DelayedOperationPurgatory.java:141)
	at io.streamnative.pulsar.handlers.kop.utils.ShutdownableThread.run(ShutdownableThread.java:104)

Found 1 deadlock.

@aloyszhang
Copy link
Contributor Author

aloyszhang commented Sep 22, 2020

As show in the jstack, this problem was caused by the synchronized on TimerTask#getTimerTaskEntry.
I think it should be removed.
@sijie PTAL.
I will open a pull request if you agree about this.

@sijie
Copy link
Member

sijie commented Sep 22, 2020

@aloyszhang sounds good to me! Thank you for working on the pull request!

@BewareMyPower
Copy link
Collaborator

BewareMyPower commented Sep 22, 2020

As I see, the deadlock occurred because:

When the DelayedOperation, which inherits TimerTask, called cancel, it would call TimerTaskEntry#remove after acquiring the lock of TimerTask. Then it called TimerTaskList#remove, which would try to acquire the lock of TimerTaskList.

However, the TimerTaskList called flush from another thread and acquired the lock of itself, which is TimerTaskList. While the lambda passed to the flush method checked if the timer task entry is cancelled by calling TimerTaskEntry#cancelled, which called TimerTask#getTimerTaskEntry to try to acquire the lock of TimerTask.


Whatever, the original implementation of Kafka's TimerTask#getTimerTaskEntry doesn't hold the lock indeed, so removing the synchronized would be safe anyway.

But it would be appreciated to tell if the thread visibility problem wouldn't happen after doing that? I mean, is it necessary to add a volatile qualifier to timerTaskEntry here? Though the Kafka's implementation didn't add volatile.

@aloyszhang
Copy link
Contributor Author

aloyszhang commented Sep 23, 2020

@BewareMyPower Thanks for your suggestions, I agree with you that we should add a volatile to make sure not miss any update for timerTaskEntry when call TimerTask#getTimerTaskEntry

jiazhai pushed a commit that referenced this issue Sep 23, 2020
fix #181
This pull request fix the deadlock in kop.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants