Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce EXACTLY_ONCE contracts to coroutineScope, supervisorScope, … #2030

Merged
merged 1 commit into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions integration/kotlinx-coroutines-jdk8/src/time/Time.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines.time

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.*
import java.time.*
import java.time.temporal.*
import kotlin.contracts.*

/**
* "java.time" adapter method for [kotlinx.coroutines.delay].
Expand Down Expand Up @@ -35,8 +38,12 @@ public fun <R> SelectBuilder<R>.onTimeout(duration: Duration, block: suspend ()
/**
* "java.time" adapter method for [kotlinx.coroutines.withTimeout].
*/
public suspend fun <T> withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T =
kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block)
public suspend fun <T> withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block)
}

/**
* "java.time" adapter method for [kotlinx.coroutines.withTimeoutOrNull].
Expand Down
49 changes: 28 additions & 21 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -134,31 +136,36 @@ private class LazyDeferredCoroutine<T>(
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}

/**
Expand Down
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

Expand Down Expand Up @@ -183,11 +185,15 @@ public object GlobalScope : CoroutineScope {
* or may throw a corresponding unhandled [Throwable] if there is any unhandled exception in this scope
* (for example, from a crashed coroutine that was started with [launch][CoroutineScope.launch] in this scope).
*/
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

/**
* Creates a [CoroutineScope] that wraps the given coroutine [context].
Expand Down
11 changes: 8 additions & 3 deletions kotlinx-coroutines-core/common/src/Supervisor.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:OptIn(ExperimentalContracts::class)
@file:Suppress("DEPRECATION_ERROR")

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -47,11 +48,15 @@ public fun SupervisorJob0(parent: Job? = null) : Job = SupervisorJob(parent)
* A failure of the scope itself (exception thrown in the [block] or cancellation) fails the scope with all its children,
* but does not cancel parent job.
*/
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = SupervisorCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
Expand Down
13 changes: 11 additions & 2 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand All @@ -27,6 +29,9 @@ import kotlin.time.*
* @param timeMillis timeout time in milliseconds.
*/
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
Expand All @@ -46,8 +51,12 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T =
withTimeout(timeout.toDelayMillis(), block)
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return withTimeout(timeout.toDelayMillis(), block)
}

/**
* Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns
Expand Down
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines.selects

Expand All @@ -10,6 +11,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.sync.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -199,8 +201,11 @@ public interface SelectInstance<in R> {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
contract {
callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val scope = SelectBuilderImpl(uCont)
try {
builder(scope)
Expand All @@ -209,6 +214,7 @@ public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() ->
}
scope.getResult()
}
}


@SharedImmutable
Expand Down
52 changes: 52 additions & 0 deletions kotlinx-coroutines-core/common/test/BuilderContractsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.selects.*
import kotlin.test.*

class BuilderContractsTest : TestBase() {

@Test
fun testContracts() = runTest {
// Coroutine scope
val cs: Int
coroutineScope {
cs = 42
}
consume(cs)

// Supervisor scope
val svs: Int
supervisorScope {
svs = 21
}
consume(svs)

// with context scope
val wctx: Int
withContext(Dispatchers.Unconfined) {
wctx = 239
}
consume(wctx)

val wt: Int
withTimeout(Long.MAX_VALUE) {
wt = 123
}
consume(wt)

val s: Int
select<Unit> {
s = 42
Job().apply { complete() }.onJoin {}
}
consume(s)
}

private fun consume(a: Int) {
a.hashCode() // BE codegen verification
}
}
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*

/**
Expand All @@ -34,6 +36,9 @@ import kotlin.coroutines.*
*/
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
Expand Down
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,13 @@ class RunBlockingTest : TestBase() {

handle.dispose()
}

@Test
fun testContract() {
val rb: Int
runBlocking {
rb = 42
}
rb.hashCode() // unused
}
}