From 262fb6673a1f75d7e22c44b7c3200f136b5b958b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 11 Mar 2020 16:31:52 +0300 Subject: [PATCH 1/2] Fix benchmarks compilation --- .../kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt | 3 ++- .../kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt index 62cc2e5c50..2573d30da0 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt @@ -6,10 +6,11 @@ package benchmarks.flow.scrabble import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow import org.openjdk.jmh.annotations.* -import java.lang.Long.* import java.util.* import java.util.concurrent.* +import kotlin.math.* @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt index 87d0e61232..fa944fac84 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt @@ -5,10 +5,11 @@ package benchmarks.flow.scrabble import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* import java.lang.Long.* import java.util.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) From fc46a98ec4f0a6a36c0e64842ac2d516809ef2c8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 11 Mar 2020 17:14:29 +0300 Subject: [PATCH 2/2] Detect suspendCancellableCoroutine right after suspendCancellableCoroutineReusable within the same state machine and properly cleanup its child handle when its block completes Fixes #1855 --- .../common/src/CancellableContinuationImpl.kt | 3 ++- .../src/internal/DispatchedContinuation.kt | 20 +++++++++++++++++-- .../ReusableCancellableContinuationTest.kt | 13 ++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 0cc9b57dec..1f67dd3c6c 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -85,12 +85,13 @@ internal open class CancellableContinuationImpl( // This method does nothing. Leftover for binary compatibility with old compiled code } - private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable + private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) /** * Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work. * Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. */ + @JvmName("resetState") // Prettier stack traces internal fun resetState(): Boolean { assert { parentHandle !== NonDisposableHandle } val state = _state.value diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index acb6c48513..50758146af 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -63,8 +63,24 @@ internal class DispatchedContinuation( public val reusableCancellableContinuation: CancellableContinuationImpl<*>? get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> - public val isReusable: Boolean - get() = _reusableCancellableContinuation.value != null + public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { + /* + * Reusability control: + * `null` -> no reusability at all, false + * If current state is not CCI, then we are within `suspendAtomicCancellableCoroutineReusable`, true + * Else, if result is CCI === requester. + * Identity check my fail for the following pattern: + * ``` + * loop: + * suspendAtomicCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle + * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise + * it will leak because it won't be freed by `releaseInterceptedContinuation` + * ``` + */ + val value = _reusableCancellableContinuation.value ?: return false + if (value is CancellableContinuationImpl<*>) return value === requester + return true + } /** * Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block, diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 997f746118..5f5620c632 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -192,4 +192,17 @@ class ReusableCancellableContinuationTest : TestBase() { FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> } finish(3) } + + @Test + fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest { + val channel = produce { + repeat(10) { + send(Unit) + } + } + for (value in channel) { + delay(1) + } + FieldWalker.assertReachableCount(1, coroutineContext[Job], { it is ChildContinuation }) + } }