Skip to content

Commit

Permalink
Make Flow more cancellation friendly:
Browse files Browse the repository at this point in the history
    * flow builder is cancellable by default
    * New unambiguous currentContext top-level function
    * New Flow.cancellable() operator
    * Set of lint to catch programmer's errors in compile time

Fixes #2026
  • Loading branch information
qwwdfsad committed May 14, 2020
1 parent cfb5af4 commit d8933fc
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 7 deletions.
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public final class kotlinx/coroutines/CoroutineScopeKt {
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/CoroutineScope;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun coroutineScope (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun currentContext (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun ensureActive (Lkotlinx/coroutines/CoroutineScope;)V
public static final fun isActive (Lkotlinx/coroutines/CoroutineScope;)Z
public static final fun plus (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineScope;
Expand Down Expand Up @@ -873,6 +874,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -999,9 +1001,13 @@ public final class kotlinx/coroutines/flow/FlowKt {
}

public final class kotlinx/coroutines/flow/LintKt {
public static final fun cancel (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/flow/FlowCollector;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun getCoroutineContext (Lkotlinx/coroutines/flow/FlowCollector;)Lkotlin/coroutines/CoroutineContext;
public static final fun isActive (Lkotlinx/coroutines/flow/FlowCollector;)Z
}

public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
Expand Down
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,19 @@ public fun CoroutineScope.cancel(message: String, cause: Throwable? = null): Uni
* ```
*/
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()


/**
* Returns the current [CoroutineContext] retrieved by using [kotlin.coroutines.coroutineContext].
* This function is an alias to avoid name clash with [CoroutineScope.coroutineContext] in a receiver position:
*
* ```
* launch { // this: CoroutineScope
* val flow = flow<Unit> {
* coroutineContext // Resolves into the context of outer launch, which is incorrect, see KT
* currentContext() // Retrieves actual context whe the flow is collected
* }
* }
* ```
*/
public suspend inline fun currentContext(): CoroutineContext = coroutineContext
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* fibonacci().take(100).collect { println(it) }
* ```
* Emissions from [flow] builder are [cancellable] by default.
*
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
* For example, the following code will result in an [IllegalStateException]:
Expand Down
14 changes: 13 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
}
}

/**
* Returns a flow which checks cancellation status on each emission and throws
* the corresponding cancellation cause if flow collector was cancelled.
* Note that [flow] builder is [cancellable] by default.
*/
public fun <T> Flow<T>.cancellable(): Flow<T> = unsafeFlow {
collect {
currentContext().ensureActive()
emit(it)
}
}

/**
* The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
* This operator is context preserving and does not affect the context of the preceding and subsequent operations.
Expand Down Expand Up @@ -256,7 +268,7 @@ public fun <T, R> Flow<T>.flowWith(
* All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
* It is also necessary not to mess with cancellation if multiple flowWith are used.
*/
val originalContext = coroutineContext.minusKey(Job)
val originalContext = currentContext().minusKey(Job)
val prepared = source.flowOn(originalContext).buffer(bufferSize)
builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
return@collect emit(value)
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
val safeCollector = SafeCollector<T>(this, coroutineContext)
val safeCollector = SafeCollector<T>(this, currentContext())
try {
safeCollector.action()
} finally {
Expand Down Expand Up @@ -153,7 +153,7 @@ public fun <T> Flow<T>.onCompletion(
throw e
}
// Normal completion
SafeCollector(this, coroutineContext).invokeSafely(action, null)
SafeCollector(this, currentContext()).invokeSafely(action, null)
}

/**
Expand All @@ -178,7 +178,7 @@ public fun <T> Flow<T>.onEmpty(
emit(it)
}
if (isEmpty) {
val collector = SafeCollector(this, coroutineContext)
val collector = SafeCollector(this, currentContext())
try {
collector.action()
} finally {
Expand Down
29 changes: 29 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Lint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -41,3 +42,31 @@ public fun <T> StateFlow<T>.conflate(): Flow<T> = noImpl()
replaceWith = ReplaceWith("this")
)
public fun <T> StateFlow<T>.distinctUntilChanged(): Flow<T> = noImpl()

@Deprecated(
message = "isActive is resolved into the extension of outer CoroutineScope which is likely to be an error." +
"Use currentContext().isActive or cancellable() operator instead " +
"or specify the receiver of isActive explicitly. " +
"Additionally, flow {} builder emissions are cancellable by default.",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext().isActive")
)
public val FlowCollector<*>.isActive: Boolean
get() = noImpl()

@Deprecated(
message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error." +
"Use currentContext().cancel() instead or specify the receiver of cancel() explicitly",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext().cancel(cause)")
)
public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl()

@Deprecated(
message = "coroutineContext is resolved into the property of outer CoroutineScope which is likely to be an error." +
"Use currentContext() instead or specify the receiver of coroutineContext explicitly",
level = DeprecationLevel.WARNING, // ERROR in 1.4
replaceWith = ReplaceWith("currentContext()")
)
public val FlowCollector<*>.coroutineContext: CoroutineContext
get() = noImpl()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class CancellableTest : TestBase() {

@Test
fun testCancellable() = runTest {
var sum = 0
val flow = (0..1000).asFlow()
.onEach {
if (it != 0) currentContext().cancel()
sum += it
}

flow.launchIn(this).join()
assertEquals(500500, sum)

sum = 0
flow.cancellable().launchIn(this).join()
assertEquals(1, sum)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
}
launch {
expect(2)
yield()
job.cancel()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

Expand All @@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
private var lastEmissionContext: CoroutineContext? = null

override suspend fun emit(value: T) {
val currentContext = coroutineContext
val currentContext = currentContext()
currentContext.ensureActive()
if (lastEmissionContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
Expand Down Expand Up @@ -62,6 +63,7 @@ internal actual class SafeCollector<T> actual constructor(

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
Expand Down
46 changes: 46 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/FlowCancellationTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class FlowCancellationTest : TestBase() {

@Test
fun testEmitIsCooperative() = runTest {
val latch = Channel<Unit>(1)
val job = flow {
expect(1)
latch.send(Unit)
while (true) {
emit(42)
}
}.launchIn(this + Dispatchers.Default)

latch.receive()
expect(2)
job.cancelAndJoin()
finish(3)
}

@Test
fun testIsActiveOnCurrentContext() = runTest {
val latch = Channel<Unit>(1)
val job = flow<Unit> {
expect(1)
latch.send(Unit)
while (currentContext().isActive) {
// Do nothing
}
}.launchIn(this + Dispatchers.Default)

latch.receive()
expect(2)
job.cancelAndJoin()
finish(3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

Expand All @@ -17,7 +18,8 @@ internal actual class SafeCollector<T> actual constructor(
private var lastEmissionContext: CoroutineContext? = null

override suspend fun emit(value: T) {
val currentContext = coroutineContext
val currentContext = currentContext()
currentContext.ensureActive()
if (lastEmissionContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactive/src/Convert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlin.coroutines.*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
level = DeprecationLevel.WARNING,
level = DeprecationLevel.WARNING, // Error in 1.4
replaceWith = ReplaceWith("this.consumeAsFlow().asPublisher()"))
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> = publish(context) {
for (t in this@asPublisher)
Expand Down

0 comments on commit d8933fc

Please sign in to comment.