From 8d7c58163f52e26a42a0d06f42b51a4e08eb3fd4 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 16 Sep 2019 17:55:33 +0300 Subject: [PATCH 1/3] Take benchmark --- .../kotlin/benchmarks/flow/TakeBenchmark.kt | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt new file mode 100644 index 0000000000..be38f12b55 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* +import java.util.concurrent.CancellationException +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* +import benchmarks.flow.scrabble.flow as unsafeFlow + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +open class TakeBenchmark { + @Param("1", "10", "100", "1000") + private var size: Int = 0 + + private suspend inline fun Flow.consume() = + filter { it % 2L != 0L } + .map { it * it }.count() + + @Benchmark + fun baseline() = runBlocking { + (0L until size).asFlow().consume() + } + + @Benchmark + fun originalTake() = runBlocking { + (0L..Long.MAX_VALUE).asFlow().originalTake(size).consume() + } + + @Benchmark + fun fastPathTake() = runBlocking { + (0L..Long.MAX_VALUE).asFlow().fastPathTake(size).consume() + } + + @Benchmark + fun mergedStateMachine() = runBlocking { + (0L..Long.MAX_VALUE).asFlow().mergedStateMachineTake(size).consume() + } + + + internal class StacklessCancellationException() : CancellationException() { + override fun fillInStackTrace(): Throwable = this + } + + public fun Flow.originalTake(count: Int): Flow { + return unsafeFlow { + var consumed = 0 + try { + collect { value -> + emit(value) + if (++consumed == count) { + throw StacklessCancellationException() + } + } + } catch (e: StacklessCancellationException) { + // Nothing, bail out + } + } + } + + + public fun Flow.fastPathTake(count: Int): Flow { + + suspend fun FlowCollector.emitAbort(value: T) { + emit(value) + throw StacklessCancellationException() + } + + return unsafeFlow { + var consumed = 0 + try { + collect { value -> + if (++consumed < count) { + return@collect emit(value) + } else { + return@collect emitAbort(value) + } + } + } catch (e: StacklessCancellationException) { + // Nothing, bail out + } + } + } + + + public fun Flow.mergedStateMachineTake(count: Int): Flow { + return unsafeFlow() { + try { + val takeCollector = FlowTakeCollector(count, this) + collect(takeCollector) + } catch (e: StacklessCancellationException) { + // Nothing, bail out + } + } + } + + + private class FlowTakeCollector( + private val count: Int, + downstream: FlowCollector + ) : FlowCollector, Continuation { + private var consumed = 0 + // Workaround for KT-30991 + private val emitFun = run { + val suspendFun: suspend (T) -> Unit = { downstream.emit(it) } + suspendFun as Function2, Any?> + } + + private var caller: Continuation? = null // lateinit + + override val context: CoroutineContext + get() = caller?.context ?: EmptyCoroutineContext + + override fun resumeWith(result: Result) { + val completion = caller!! + if (++consumed == count) completion.resumeWith(Result.failure(StacklessCancellationException())) + else completion.resumeWith(Result.success(Unit)) + } + + override suspend fun emit(value: T) = suspendCoroutineUninterceptedOrReturn sc@{ + // Invoke it in non-suspending way + caller = it + val result = emitFun.invoke(value, this) + if (result !== COROUTINE_SUSPENDED) { + if (++consumed == count) throw StacklessCancellationException() + else return@sc Unit + } + COROUTINE_SUSPENDED + } + } +} From 4816dd59df0968ad63885059bf92e66141f200a1 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 16 Sep 2019 18:09:12 +0300 Subject: [PATCH 2/3] Optimize Flow.take, allocate SM instance only once for the last flow element --- .../common/src/flow/operators/Limit.kt | 13 +++++++++--- .../common/test/flow/operators/TakeTest.kt | 21 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index 7f638f9814..02ddc74c2f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -51,13 +51,20 @@ public fun Flow.dropWhile(predicate: suspend (T) -> Boolean): Flow = f @ExperimentalCoroutinesApi public fun Flow.take(count: Int): Flow { require(count > 0) { "Requested element count $count should be positive" } + + suspend fun FlowCollector.emitAbort(value: T) { + emit(value) + throw AbortFlowException() + } + return flow { var consumed = 0 try { collect { value -> - emit(value) - if (++consumed == count) { - throw AbortFlowException() + if (++consumed < count) { + return@collect emit(value) + } else { + return@collect emitAbort(value) } } } catch (e: AbortFlowException) { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt index 711034969f..8ea137df08 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/TakeTest.kt @@ -21,6 +21,27 @@ class TakeTest : TestBase() { assertEquals(2, flow.drop(1).take(1).single()) } + @Test + fun testIllegalArgument() { + assertFailsWith { flowOf(1).take(0) } + assertFailsWith { flowOf(1).take(-1) } + } + + @Test + fun testTakeSuspending() = runTest { + val flow = flow { + emit(1) + yield() + emit(2) + yield() + } + + assertEquals(3, flow.take(2).sum()) + assertEquals(3, flow.take(Int.MAX_VALUE).sum()) + assertEquals(1, flow.take(1).single()) + assertEquals(2, flow.drop(1).take(1).single()) + } + @Test fun testEmptyFlow() = runTest { val sum = emptyFlow().take(10).sum() From 0bfb5566a39b2550811422c7effb287b73118482 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 17 Sep 2019 18:59:23 +0300 Subject: [PATCH 3/3] Get rid of local suspend function to avoid excess caching --- .../src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt | 11 ++++------- .../common/src/flow/operators/Limit.kt | 11 +++++------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt index be38f12b55..84afca2439 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/TakeBenchmark.kt @@ -47,7 +47,6 @@ open class TakeBenchmark { (0L..Long.MAX_VALUE).asFlow().mergedStateMachineTake(size).consume() } - internal class StacklessCancellationException() : CancellationException() { override fun fillInStackTrace(): Throwable = this } @@ -68,14 +67,12 @@ open class TakeBenchmark { } } + private suspend fun FlowCollector.emitAbort(value: T) { + emit(value) + throw StacklessCancellationException() + } public fun Flow.fastPathTake(count: Int): Flow { - - suspend fun FlowCollector.emitAbort(value: T) { - emit(value) - throw StacklessCancellationException() - } - return unsafeFlow { var consumed = 0 try { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index 02ddc74c2f..1343dad868 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -51,12 +51,6 @@ public fun Flow.dropWhile(predicate: suspend (T) -> Boolean): Flow = f @ExperimentalCoroutinesApi public fun Flow.take(count: Int): Flow { require(count > 0) { "Requested element count $count should be positive" } - - suspend fun FlowCollector.emitAbort(value: T) { - emit(value) - throw AbortFlowException() - } - return flow { var consumed = 0 try { @@ -73,6 +67,11 @@ public fun Flow.take(count: Int): Flow { } } +private suspend fun FlowCollector.emitAbort(value: T) { + emit(value) + throw AbortFlowException() +} + /** * Returns a flow that contains first elements satisfying the given [predicate]. */