From e6f2df6ad21dc25ecaa6edd3ef873797aa608e70 Mon Sep 17 00:00:00 2001 From: Nikita Koval Date: Mon, 13 Jan 2020 13:56:29 +0300 Subject: [PATCH 1/5] Make the list of segments more abstract, so that it can be used for other synchronization and communication primitives --- .../src/internal/ConcurrentLinkedList.kt | 223 ++++++++++++++++++ .../common/src/internal/SegmentQueue.kt | 179 -------------- .../common/src/sync/Semaphore.kt | 67 +++--- .../jvm/test/internal/SegmentBasedQueue.kt | 85 ++++--- .../jvm/test/internal/SegmentListTest.kt | 41 ++++ .../jvm/test/internal/SegmentQueueTest.kt | 17 +- .../SegmentQueueLCStressTest.kt | 15 +- 7 files changed, 386 insertions(+), 241 deletions(-) create mode 100644 kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt delete mode 100644 kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt create mode 100644 kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt new file mode 100644 index 0000000000..1a0d872353 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -0,0 +1,223 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.native.concurrent.SharedImmutable + +// Returns the first segment `s` with `s.id >= id` or `CLOSED` +// if all the segments in this linked list have lower `id` and the list is closed for further segment additions. +private inline fun > S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed { + // Go through `next` references and add new segments if needed, + // similarly to the `push` in the Michael-Scott queue algorithm. + // The only difference is that `CAS failure` means that the + // required segment has already been added, so the algorithm just + // uses it. This way, only one segment with each id can be added. + var cur: S = this + while (cur.id < id || cur.removed) { + val nextOrClosed = cur.nextOrClosed + if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED) + val next: S = if (nextOrClosed.node === null) { + val newTail = createNewSegment(cur.id + 1, cur) + if (cur.trySetNext(newTail)) { + if (cur.removed) cur.remove() + newTail + } else { + val nextOrClosed = cur.nextOrClosed + if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED) + nextOrClosed.node!! + } + } else nextOrClosed.node!! + cur = next + } + return SegmentOrClosed(cur) +} + +// Returns `false` if the segment `to` is logically removed, `true` on successful update. +private inline fun > AtomicRef.moveForward(to: S): Boolean = loop { cur -> + if (cur.id >= to.id) return true + if (!to.tryIncPointers()) return false + if (compareAndSet(cur, to)) { + // the segment is moved + if (cur.decPointers()) cur.remove() + return true + } else { + if (to.decPointers()) to.remove() + } +} + +/** + * Tries to find a segment with the specified [id] following by next references from the + * [startFrom] segment and creating new ones if needed. The typical use-case is reading this `AtomicRef` values, + * doing some synchronization, and invoking this function to find the required segment and update the pointer. + * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed + * (e.g., queues should use it in dequeue operations). + * + * Since segments can be removed from the list, or it can be closed for further segment additions, this function + * returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id` + * and the list is closed. + */ +internal inline fun > AtomicRef.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed { + while (true) { + val s = startFrom.findSegmentInternal(id, createNewSegment) + if (s.isClosed || moveForward(s.segment)) return s + } +} + +/** + * Closes this linked list of nodes by forbidding adding new ones, + * returns the last node in the list. + */ +internal fun > N.close(): N { + var cur: N = this + while (true) { + val next = cur.nextOrClosed.run { if (isClosed) return cur else node } + if (next === null) { + if (cur.markAsClosed()) return cur + } else { + cur = next + } + } +} + +internal abstract class ConcurrentLinkedListNode>(prev: N?) { + // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm. + private val _next = atomic(null) + val nextOrClosed: NextNodeOrClosed get() = NextNodeOrClosed(_next.value) + fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value) + + /** + * Checks whether this node is the physical tail of the current linked list. + */ + val isTail: Boolean get() = _next.value.let { it === null || it === CLOSED } + + // Pointer to the previous node, updates in [remove] function. + private val _prev = atomic(prev) + val prev: N? get() = _prev.value + + /** + * Cleans the pointer to the previous node. + */ + fun cleanPrev() { _prev.lazySet(null) } + + /** + * Tries to mark the linked list as closed by forbidding adding new nodes after this one. + */ + fun markAsClosed() = _next.compareAndSet(null, CLOSED) + + /** + * Checks whether this node is a physical tail and is closed for further node additions. + */ + val isClosed get() = _next.value === CLOSED + + /** + * This property indicates whether the current node is logically removed. + * The expected use-case is removing the node logically (so that [removed] becomes true), + * and invoking [remove] after that. Note that this implementation relies on the contract + * that the physical tail cannot be logically removed. Please, do not break this contract; + * otherwise, memory leaks and unexpected behavior can occur. + */ + abstract val removed: Boolean + + /** + * Removes this node physically from this linked list. The node should be + * logically removed (so [removed] returns `true`) at the point of invocation. + */ + fun remove() { + assert { removed } // The node should be logically removed at first. + assert { nextOrClosed.node !== null } // The physical tail cannot be removed. + while (true) { + // Read `next` and `prev` pointers ignoring logically removed nodes. + val prev = leftmostAliveNode + val next = rightmostAliveNode + // Link `next` and `prev`. + next._prev.value = prev + if (prev !== null) prev._next.value = next + // Check that prev and next are still alive. + if (next.removed) continue + if (prev !== null && prev.removed) continue + // This node is removed. + return + } + } + + private val leftmostAliveNode: N? get() { + var cur = prev + while (cur !== null && cur.removed) + cur = cur._prev.value + return cur + } + + private val rightmostAliveNode: N get() { + assert { !isTail } // Should not be invoked on the tail node + var cur = nextOrClosed.node!! + while (cur.removed) + cur = cur.nextOrClosed.node!! + return cur + } +} + +/** + * Each segment in the list has a unique id and is created by the provided to [findSegmentAndMoveForward] method. + * Essentially, this is a node in the Michael-Scott queue algorithm, + * but with maintaining [prev] pointer for efficient [remove] implementation. + */ +internal abstract class Segment>(val id: Long, prev: S?, pointers: Int): ConcurrentLinkedListNode(prev) { + /** + * This property should return the maximal number of slots in this segment, + * it is used to define whether the segment is logically removed. + */ + abstract val maxSlots: Int + + /** + * Numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits) + */ + private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT) + + /** + * The segment is considered as removed if all the slots are cleaned, + * there is no pointers to this segment from outside, and + * it is not a physical tail in the linked list of segments. + */ + override val removed get() = cleanedAndPointers.value == maxSlots && !isTail + + // increments the number of pointers if this segment is not logically removed. + internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail } + + // returns `true` if this segment is logically removed after the decrement. + internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail + + /** + * This functions should be invoked on each slot clean-up; + * should not be invoked twice for the same slot. + */ + fun onSlotCleaned() { + if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove() + } +} + +private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) -> Boolean): Boolean { + while (true) { + val cur = this.value + if (!condition(cur)) return false + if (this.compareAndSet(cur, cur + delta)) return true + } +} + +internal inline class SegmentOrClosed>(private val value: Any?) { + val isClosed: Boolean get() = value === CLOSED + val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S +} + +internal inline class NextNodeOrClosed>(private val value: Any?) { + val isClosed: Boolean get() = value === CLOSED + val node: N? get() = if (isClosed) null else value as N? +} + +private const val POINTERS_SHIFT = 16 + +@SharedImmutable +private val CLOSED = Symbol("CLOSED") \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt b/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt deleted file mode 100644 index 0091d13671..0000000000 --- a/kotlinx-coroutines-core/common/src/internal/SegmentQueue.kt +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* - -/** - * Essentially, this segment queue is an infinite array of segments, which is represented as - * a Michael-Scott queue of them. All segments are instances of [Segment] class and - * follow in natural order (see [Segment.id]) in the queue. - */ -internal abstract class SegmentQueue>() { - private val _head: AtomicRef - /** - * Returns the first segment in the queue. - */ - protected val head: S get() = _head.value - - private val _tail: AtomicRef - /** - * Returns the last segment in the queue. - */ - protected val tail: S get() = _tail.value - - init { - val initialSegment = newSegment(0) - _head = atomic(initialSegment) - _tail = atomic(initialSegment) - } - - /** - * The implementation should create an instance of segment [S] with the specified id - * and initial reference to the previous one. - */ - abstract fun newSegment(id: Long, prev: S? = null): S - - /** - * Finds a segment with the specified [id] following by next references from the - * [startFrom] segment. The typical use-case is reading [tail] or [head], doing some - * synchronization, and invoking [getSegment] or [getSegmentAndMoveHead] correspondingly - * to find the required segment. - */ - protected fun getSegment(startFrom: S, id: Long): S? { - // Go through `next` references and add new segments if needed, - // similarly to the `push` in the Michael-Scott queue algorithm. - // The only difference is that `CAS failure` means that the - // required segment has already been added, so the algorithm just - // uses it. This way, only one segment with each id can be in the queue. - var cur: S = startFrom - while (cur.id < id) { - var curNext = cur.next - if (curNext == null) { - // Add a new segment. - val newTail = newSegment(cur.id + 1, cur) - curNext = if (cur.casNext(null, newTail)) { - if (cur.removed) { - cur.remove() - } - moveTailForward(newTail) - newTail - } else { - cur.next!! - } - } - cur = curNext - } - if (cur.id != id) return null - return cur - } - - /** - * Invokes [getSegment] and replaces [head] with the result if its [id] is greater. - */ - protected fun getSegmentAndMoveHead(startFrom: S, id: Long): S? { - @Suppress("LeakingThis") - if (startFrom.id == id) return startFrom - val s = getSegment(startFrom, id) ?: return null - moveHeadForward(s) - return s - } - - /** - * Updates [head] to the specified segment - * if its `id` is greater. - */ - private fun moveHeadForward(new: S) { - _head.loop { curHead -> - if (curHead.id > new.id) return - if (_head.compareAndSet(curHead, new)) { - new.prev.value = null - return - } - } - } - - /** - * Updates [tail] to the specified segment - * if its `id` is greater. - */ - private fun moveTailForward(new: S) { - _tail.loop { curTail -> - if (curTail.id > new.id) return - if (_tail.compareAndSet(curTail, new)) return - } - } -} - -/** - * Each segment in [SegmentQueue] has a unique id and is created by [SegmentQueue.newSegment]. - * Essentially, this is a node in the Michael-Scott queue algorithm, but with - * maintaining [prev] pointer for efficient [remove] implementation. - */ -internal abstract class Segment>(val id: Long, prev: S?) { - // Pointer to the next segment, updates similarly to the Michael-Scott queue algorithm. - private val _next = atomic(null) - val next: S? get() = _next.value - fun casNext(expected: S?, value: S?): Boolean = _next.compareAndSet(expected, value) - // Pointer to the previous segment, updates in [remove] function. - val prev = atomic(null) - - /** - * Returns `true` if this segment is logically removed from the queue. - * The [remove] function should be called right after it becomes logically removed. - */ - abstract val removed: Boolean - - init { - this.prev.value = prev - } - - /** - * Removes this segment physically from the segment queue. The segment should be - * logically removed (so [removed] returns `true`) at the point of invocation. - */ - fun remove() { - assert { removed } // The segment should be logically removed at first - // Read `next` and `prev` pointers. - var next = this._next.value ?: return // tail cannot be removed - var prev = prev.value ?: return // head cannot be removed - // Link `next` and `prev`. - prev.moveNextToRight(next) - while (prev.removed) { - prev = prev.prev.value ?: break - prev.moveNextToRight(next) - } - next.movePrevToLeft(prev) - while (next.removed) { - next = next.next ?: break - next.movePrevToLeft(prev) - } - } - - /** - * Updates [next] pointer to the specified segment if - * the [id] of the specified segment is greater. - */ - private fun moveNextToRight(next: S) { - while (true) { - val curNext = this._next.value as S - if (next.id <= curNext.id) return - if (this._next.compareAndSet(curNext, next)) return - } - } - - /** - * Updates [prev] pointer to the specified segment if - * the [id] of the specified segment is lower. - */ - private fun movePrevToLeft(prev: S) { - while (true) { - val curPrev = this.prev.value ?: return - if (curPrev.id <= prev.id) return - if (this.prev.compareAndSet(curPrev, prev)) return - } - } -} diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index aa7ed63d3d..7cdc736197 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -8,9 +8,8 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* -import kotlin.jvm.* import kotlin.math.* -import kotlin.native.concurrent.* +import kotlin.native.concurrent.SharedImmutable /** * A counting semaphore for coroutines that logically maintains a number of available permits. @@ -84,16 +83,26 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T { } } -private class SemaphoreImpl( - private val permits: Int, acquiredPermits: Int -) : Semaphore, SegmentQueue() { +private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { + + // The queue of waiting acquirers is essentially an infinite array based on the list of segments + // (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue + // and dequeue operation, we increment the corresponding counter at the beginning of the operation + // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair + // works with an individual cell.We use the corresponding segment pointer to find the required ones. + private val head: AtomicRef + private val deqIdx = atomic(0L) + private val tail: AtomicRef + private val enqIdx = atomic(0L) + init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } + val s = SemaphoreSegment(0, null, 2) + head = atomic(s) + tail = atomic(s) } - override fun newSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev) - /** * This counter indicates a number of available permits if it is non-negative, * or the size with minus sign otherwise. Note, that 32-bit counter is enough here @@ -104,14 +113,6 @@ private class SemaphoreImpl( private val _availablePermits = atomic(permits - acquiredPermits) override val availablePermits: Int get() = max(_availablePermits.value, 0) - // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`; - // each segment contains a fixed number of slots. To determine a slot for each enqueue - // and dequeue operation, we increment the corresponding counter at the beginning of the operation - // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair - // works with an individual cell. - private val enqIdx = atomic(0L) - private val deqIdx = atomic(0L) - override fun tryAcquire(): Boolean { _availablePermits.loop { p -> if (p <= 0) return false @@ -136,12 +137,13 @@ private class SemaphoreImpl( cur + 1 } - private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable sc@ { cont -> - val last = this.tail + private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable sc@{ cont -> + val curTail = this.tail.value val enqIdx = enqIdx.getAndIncrement() - val segment = getSegment(last, enqIdx / SEGMENT_SIZE) + val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, + createNewSegment = ::createSegment).run { segment } // cannot be closed val i = (enqIdx % SEGMENT_SIZE).toInt() - if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { + if (segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { // already resumed cont.resume(Unit) return@sc @@ -151,10 +153,17 @@ private class SemaphoreImpl( @Suppress("UNCHECKED_CAST") internal fun resumeNextFromQueue() { - try_again@while (true) { - val first = this.head + try_again@ while (true) { + val curHead = this.head.value val deqIdx = deqIdx.getAndIncrement() - val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again + val id = deqIdx / SEGMENT_SIZE + val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, + createNewSegment = ::createSegment).run { segment } // cannot be closed + segment.cleanPrev() + if (segment.id > id) { + this.deqIdx.updateIfLower(segment.id * SEGMENT_SIZE) + continue@try_again + } val i = (deqIdx % SEGMENT_SIZE).toInt() val cont = segment.getAndSet(i, RESUMED) if (cont === null) return // just resumed @@ -165,6 +174,10 @@ private class SemaphoreImpl( } } +private inline fun AtomicLong.updateIfLower(value: Long): Unit = loop { cur -> + if (cur >= value || compareAndSet(cur, value)) return +} + private class CancelSemaphoreAcquisitionHandler( private val semaphore: SemaphoreImpl, private val segment: SemaphoreSegment, @@ -180,10 +193,11 @@ private class CancelSemaphoreAcquisitionHandler( override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]" } -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment(id, prev) { +private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) + +private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) { val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - private val cancelledSlots = atomic(0) - override val removed get() = cancelledSlots.value == SEGMENT_SIZE + override val maxSlots: Int get() = SEGMENT_SIZE @Suppress("NOTHING_TO_INLINE") inline fun get(index: Int): Any? = acquirers[index].value @@ -200,8 +214,7 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment : SegmentQueue>() { - override fun newSegment(id: Long, prev: OneElementSegment?): OneElementSegment = OneElementSegment(id, prev) +internal class SegmentBasedQueue { + private val head: AtomicRef> + private val tail: AtomicRef> private val enqIdx = atomic(0L) private val deqIdx = atomic(0L) - // Returns the segments associated with the enqueued element. - fun enqueue(element: T): OneElementSegment { + init { + val s = OneElementSegment(0, null, 2) + head = atomic(s) + tail = atomic(s) + } + + // Returns the segments associated with the enqueued element, or `null` if the queue is closed. + fun enqueue(element: T): OneElementSegment? { while (true) { - var tail = this.tail + val curTail = this.tail.value val enqIdx = this.enqIdx.getAndIncrement() - tail = getSegment(tail, enqIdx) ?: continue - if (tail.element.value === BROKEN) continue - if (tail.element.compareAndSet(null, element)) return tail + val segmentOrClosed = this.tail.findSegmentAndMoveForward(id = enqIdx, startFrom = curTail, createNewSegment = ::createSegment) + if (segmentOrClosed.isClosed) return null + val s = segmentOrClosed.segment + if (s.element.value === BROKEN) continue + if (s.element.compareAndSet(null, element)) return s } } fun dequeue(): T? { while (true) { if (this.deqIdx.value >= this.enqIdx.value) return null - var firstSegment = this.head + val curHead = this.head.value val deqIdx = this.deqIdx.getAndIncrement() - firstSegment = getSegmentAndMoveHead(firstSegment, deqIdx) ?: continue - var el = firstSegment.element.value + val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment) + if (segmentOrClosed.isClosed) return null + val s = segmentOrClosed.segment + s.cleanPrev() + if (s.id > deqIdx) continue + var el = s.element.value if (el === null) { - if (firstSegment.element.compareAndSet(null, BROKEN)) continue - else el = firstSegment.element.value + if (s.element.compareAndSet(null, BROKEN)) continue + else el = s.element.value } - if (el === REMOVED) continue + if (el === BROKEN) continue return el as T } } + // `enqueue` should return `null` after the queue is closed + fun close(): OneElementSegment { + val s = this.tail.value.close() + var cur = s + while (true) { + cur.element.compareAndSet(null, BROKEN) + cur = cur.prev ?: break + } + return s + } + val numberOfSegments: Int get() { - var s: OneElementSegment? = head - var i = 0 - while (s != null) { - s = s.next + var cur = head.value + var i = 1 + while (true) { + cur = cur.nextOrClosed.run { + if (isClosed || node === null) return i + this.node!! + } i++ } - return i + } + + fun checkHeadPrevIsCleaned() { + check(head.value.prev === null) } } -internal class OneElementSegment(id: Long, prev: OneElementSegment?) : Segment>(id, prev) { +private fun createSegment(id: Long, prev: OneElementSegment?) = OneElementSegment(id, prev, 0) + +internal class OneElementSegment(id: Long, prev: OneElementSegment?, pointers: Int) : Segment>(id, prev, pointers) { val element = atomic(null) - override val removed get() = element.value === REMOVED + override val maxSlots get() = 1 fun removeSegment() { - element.value = REMOVED - remove() + element.value = BROKEN + onSlotCleaned() } } -private val BROKEN = Symbol("BROKEN") -private val REMOVED = Symbol("REMOVED") \ No newline at end of file +private val BROKEN = Symbol("BROKEN") \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt new file mode 100644 index 0000000000..42d6360c4f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt @@ -0,0 +1,41 @@ +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* +import org.junit.Test +import kotlin.test.* + +class SegmentListTest { + @Test + fun testRemoveTail() { + val initialSegment = TestSegment(0, null, 2) + val head = AtomicRefHolder(initialSegment) + val tail = AtomicRefHolder(initialSegment) + val s1 = tail.ref.findSegmentAndMoveForward(1, tail.ref.value, ::createTestSegment).segment + assertFalse(s1.removed) + tail.ref.value.onSlotCleaned() + assertFalse(s1.removed) + head.ref.findSegmentAndMoveForward(2, head.ref.value, ::createTestSegment) + assertFalse(s1.removed) + tail.ref.findSegmentAndMoveForward(2, head.ref.value, ::createTestSegment) + assertTrue(s1.removed) + } + + @Test + fun testClose() { + val initialSegment = TestSegment(0, null, 2) + val head = AtomicRefHolder(initialSegment) + val tail = AtomicRefHolder(initialSegment) + tail.ref.findSegmentAndMoveForward(1, tail.ref.value, ::createTestSegment) + assertEquals(tail.ref.value, tail.ref.value.close()) + assertTrue(head.ref.findSegmentAndMoveForward(2, head.ref.value, ::createTestSegment).isClosed) + } +} + +private class AtomicRefHolder(initialValue: T) { + val ref: AtomicRef = atomic(initialValue) +} + +private class TestSegment(id: Long, prev: TestSegment?, pointers: Int) : Segment(id, prev, pointers) { + override val maxSlots: Int get() = 1 +} +private fun createTestSegment(id: Long, prev: TestSegment?) = TestSegment(id, prev, 0) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueTest.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueTest.kt index b59a6488a0..fd2d329088 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentQueueTest.kt @@ -33,7 +33,7 @@ class SegmentQueueTest : TestBase() { val s = q.enqueue(2) q.enqueue(3) assertEquals(3, q.numberOfSegments) - s.removeSegment() + s!!.removeSegment() assertEquals(2, q.numberOfSegments) assertEquals(1, q.dequeue()) assertEquals(3, q.dequeue()) @@ -47,11 +47,21 @@ class SegmentQueueTest : TestBase() { val s = q.enqueue(2) assertEquals(1, q.dequeue()) q.enqueue(3) - s.removeSegment() + s!!.removeSegment() assertEquals(3, q.dequeue()) assertNull(q.dequeue()) } + @Test + fun testClose() { + val q = SegmentBasedQueue() + q.enqueue(1) + assertEquals(0, q.close().id) + assertEquals(null, q.enqueue(2)) + assertEquals(1, q.dequeue()) + assertEquals(null, q.dequeue()) + } + @Test fun stressTest() { val q = SegmentBasedQueue() @@ -64,6 +74,7 @@ class SegmentQueueTest : TestBase() { expectedQueue.add(el) } else { // remove assertEquals(expectedQueue.poll(), q.dequeue()) + q.checkHeadPrevIsCleaned() } } } @@ -78,7 +89,7 @@ class SegmentQueueTest : TestBase() { val N = 100_000 * stressTestMultiplier val T = 10 val q = SegmentBasedQueue() - val segments = (1..N).map { q.enqueue(it) }.toMutableList() + val segments = (1..N).map { q.enqueue(it)!! }.toMutableList() if (random) segments.shuffle() assertEquals(N, q.numberOfSegments) val nextSegmentIndex = AtomicInteger() diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt index 1bb51a568f..89bf8dfaa4 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt @@ -18,12 +18,17 @@ class SegmentQueueLCStressTest : VerifierState() { private val q = SegmentBasedQueue() @Operation - fun add(@Param(name = "value") value: Int) { - q.enqueue(value) + fun enqueue(@Param(name = "value") x: Int): Boolean { + return q.enqueue(x) !== null } @Operation - fun poll(): Int? = q.dequeue() + fun dequeue(): Int? = q.dequeue() + + @Operation + fun close() { + q.close() + } override fun extractState(): Any { val elements = ArrayList() @@ -31,8 +36,8 @@ class SegmentQueueLCStressTest : VerifierState() { val x = q.dequeue() ?: break elements.add(x) } - - return elements + val closed = q.enqueue(0) === null + return elements to closed } @Test From 7a77628baa2d78e1e25ebaaebe72c8dcdb13687b Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 10 Apr 2020 10:56:26 +0300 Subject: [PATCH 2/5] ~ Minor code style improvement (less duplication) --- .../src/internal/ConcurrentLinkedList.kt | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 1a0d872353..990ab93cac 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -20,18 +20,16 @@ private inline fun > S.findSegmentInternal(id: Long, createNewSeg while (cur.id < id || cur.removed) { val nextOrClosed = cur.nextOrClosed if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED) - val next: S = if (nextOrClosed.node === null) { - val newTail = createNewSegment(cur.id + 1, cur) - if (cur.trySetNext(newTail)) { - if (cur.removed) cur.remove() - newTail - } else { - val nextOrClosed = cur.nextOrClosed - if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED) - nextOrClosed.node!! - } - } else nextOrClosed.node!! - cur = next + val nextNode = nextOrClosed.node + if (nextNode != null) { // there is a next node -- move there + cur = nextNode + continue + } + val newTail = createNewSegment(cur.id + 1, cur) + if (cur.trySetNext(newTail)) { // successfully added new node -- move there + if (cur.removed) cur.remove() + cur = newTail + } } return SegmentOrClosed(cur) } @@ -40,13 +38,11 @@ private inline fun > S.findSegmentInternal(id: Long, createNewSeg private inline fun > AtomicRef.moveForward(to: S): Boolean = loop { cur -> if (cur.id >= to.id) return true if (!to.tryIncPointers()) return false - if (compareAndSet(cur, to)) { - // the segment is moved + if (compareAndSet(cur, to)) { // the segment is moved if (cur.decPointers()) cur.remove() return true - } else { - if (to.decPointers()) to.remove() } + if (to.decPointers()) to.remove() // undo tryIncPointers } /** From e6a492eab59b42359d6711befdc091d7e8364c6a Mon Sep 17 00:00:00 2001 From: Nikita Koval Date: Fri, 10 Apr 2020 15:20:43 +0300 Subject: [PATCH 3/5] Replace `nextOrClosed` with `nextOfIfClosed` --- .../src/internal/ConcurrentLinkedList.kt | 46 ++++++++++--------- .../jvm/test/internal/SegmentBasedQueue.kt | 5 +- .../jvm/test/internal/SegmentListTest.kt | 2 +- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 990ab93cac..1499f87d30 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -18,11 +18,9 @@ private inline fun > S.findSegmentInternal(id: Long, createNewSeg // uses it. This way, only one segment with each id can be added. var cur: S = this while (cur.id < id || cur.removed) { - val nextOrClosed = cur.nextOrClosed - if (nextOrClosed.isClosed) return SegmentOrClosed(CLOSED) - val nextNode = nextOrClosed.node - if (nextNode != null) { // there is a next node -- move there - cur = nextNode + val next = cur.nextOrIfClosed { return SegmentOrClosed(CLOSED) } + if (next != null) { // there is a next node -- move there + cur = next continue } val newTail = createNewSegment(cur.id + 1, cur) @@ -70,7 +68,7 @@ internal inline fun > AtomicRef.findSegmentAndMoveForward(id: internal fun > N.close(): N { var cur: N = this while (true) { - val next = cur.nextOrClosed.run { if (isClosed) return cur else node } + val next = cur.nextOrIfClosed { return cur } if (next === null) { if (cur.markAsClosed()) return cur } else { @@ -82,13 +80,29 @@ internal fun > N.close(): N { internal abstract class ConcurrentLinkedListNode>(prev: N?) { // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm. private val _next = atomic(null) - val nextOrClosed: NextNodeOrClosed get() = NextNodeOrClosed(_next.value) + private val nextInternal get() = _next.value + val next: N? get() = nextInternal.let { if (it === CLOSED) null else it as N? } + + /** + * Returns the next segment or `null` of the one does not exists, + * and invokes [onClosedAction] if this segment is marked as closed. + */ + inline fun nextOrIfClosed(onClosedAction: () -> Unit): N? = nextInternal.let { + if (it === CLOSED) { + onClosedAction() + null + } else it as N? + } + + /** + * Tries to set the next segment if it is not specified and this segment is not marked as closed. + */ fun trySetNext(value: N): Boolean = _next.compareAndSet(null, value) /** * Checks whether this node is the physical tail of the current linked list. */ - val isTail: Boolean get() = _next.value.let { it === null || it === CLOSED } + val isTail: Boolean get() = next === null // Pointer to the previous node, updates in [remove] function. private val _prev = atomic(prev) @@ -104,11 +118,6 @@ internal abstract class ConcurrentLinkedListNode */ fun markAsClosed() = _next.compareAndSet(null, CLOSED) - /** - * Checks whether this node is a physical tail and is closed for further node additions. - */ - val isClosed get() = _next.value === CLOSED - /** * This property indicates whether the current node is logically removed. * The expected use-case is removing the node logically (so that [removed] becomes true), @@ -124,7 +133,7 @@ internal abstract class ConcurrentLinkedListNode */ fun remove() { assert { removed } // The node should be logically removed at first. - assert { nextOrClosed.node !== null } // The physical tail cannot be removed. + assert { !isTail } // The physical tail cannot be removed. while (true) { // Read `next` and `prev` pointers ignoring logically removed nodes. val prev = leftmostAliveNode @@ -149,9 +158,9 @@ internal abstract class ConcurrentLinkedListNode private val rightmostAliveNode: N get() { assert { !isTail } // Should not be invoked on the tail node - var cur = nextOrClosed.node!! + var cur = next!! while (cur.removed) - cur = cur.nextOrClosed.node!! + cur = cur.next!! return cur } } @@ -208,11 +217,6 @@ internal inline class SegmentOrClosed>(private val value: Any?) { val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S } -internal inline class NextNodeOrClosed>(private val value: Any?) { - val isClosed: Boolean get() = value === CLOSED - val node: N? get() = if (isClosed) null else value as N? -} - private const val POINTERS_SHIFT = 16 @SharedImmutable diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt index cfa72d3a9d..6a3dce39ab 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt @@ -74,10 +74,7 @@ internal class SegmentBasedQueue { var cur = head.value var i = 1 while (true) { - cur = cur.nextOrClosed.run { - if (isClosed || node === null) return i - this.node!! - } + cur = cur.next ?: return i i++ } } diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt index 42d6360c4f..ff6a346cda 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentListTest.kt @@ -32,7 +32,7 @@ class SegmentListTest { } private class AtomicRefHolder(initialValue: T) { - val ref: AtomicRef = atomic(initialValue) + val ref = atomic(initialValue) } private class TestSegment(id: Long, prev: TestSegment?, pointers: Int) : Segment(id, prev, pointers) { From 5cb57ac0a089985f8b9f9b68d1ee60b376ba8219 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 10 Apr 2020 18:07:14 +0300 Subject: [PATCH 4/5] ~ Stylistic updates and a bit refactoring --- .../src/internal/ConcurrentLinkedList.kt | 75 ++++++++++++------- .../jvm/test/internal/SegmentBasedQueue.kt | 5 ++ 2 files changed, 51 insertions(+), 29 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 1499f87d30..2f3e140af5 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -6,16 +6,21 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* -import kotlin.native.concurrent.SharedImmutable - -// Returns the first segment `s` with `s.id >= id` or `CLOSED` -// if all the segments in this linked list have lower `id` and the list is closed for further segment additions. -private inline fun > S.findSegmentInternal(id: Long, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed { - // Go through `next` references and add new segments if needed, - // similarly to the `push` in the Michael-Scott queue algorithm. - // The only difference is that `CAS failure` means that the - // required segment has already been added, so the algorithm just - // uses it. This way, only one segment with each id can be added. +import kotlin.native.concurrent.* + +/** + * Returns the first segment `s` with `s.id >= id` or `CLOSED` + * if all the segments in this linked list have lower `id`, and the list is closed for further segment additions. + */ +private inline fun > S.findSegmentInternal( + id: Long, + createNewSegment: (id: Long, prev: S?) -> S +): SegmentOrClosed { + /* + Go through `next` references and add new segments if needed, similarly to the `push` in the Michael-Scott + queue algorithm. The only difference is that "CAS failure" means that the required segment has already been + added, so the algorithm just uses it. This way, only one segment with each id can be added. + */ var cur: S = this while (cur.id < id || cur.removed) { val next = cur.nextOrIfClosed { return SegmentOrClosed(CLOSED) } @@ -32,7 +37,10 @@ private inline fun > S.findSegmentInternal(id: Long, createNewSeg return SegmentOrClosed(cur) } -// Returns `false` if the segment `to` is logically removed, `true` on successful update. +/** + * Returns `false` if the segment `to` is logically removed, `true` on a successful update. + */ +@Suppress("NOTHING_TO_INLINE") // Must be inline because it is an AtomicRef extension private inline fun > AtomicRef.moveForward(to: S): Boolean = loop { cur -> if (cur.id >= to.id) return true if (!to.tryIncPointers()) return false @@ -50,11 +58,15 @@ private inline fun > AtomicRef.moveForward(to: S): Boolean = l * At the same time, [Segment.cleanPrev] should also be invoked if the previous segments are no longer needed * (e.g., queues should use it in dequeue operations). * - * Since segments can be removed from the list, or it can be closed for further segment additions, this function - * returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id` + * Since segments can be removed from the list, or it can be closed for further segment additions. + * Returns the segment `s` with `s.id >= id` or `CLOSED` if all the segments in this linked list have lower `id`, * and the list is closed. */ -internal inline fun > AtomicRef.findSegmentAndMoveForward(id: Long, startFrom: S, createNewSegment: (id: Long, prev: S?) -> S): SegmentOrClosed { +internal inline fun > AtomicRef.findSegmentAndMoveForward( + id: Long, + startFrom: S, + createNewSegment: (id: Long, prev: S?) -> S +): SegmentOrClosed { while (true) { val s = startFrom.findSegmentInternal(id, createNewSegment) if (s.isClosed || moveForward(s.segment)) return s @@ -80,20 +92,26 @@ internal fun > N.close(): N { internal abstract class ConcurrentLinkedListNode>(prev: N?) { // Pointer to the next node, updates similarly to the Michael-Scott queue algorithm. private val _next = atomic(null) - private val nextInternal get() = _next.value - val next: N? get() = nextInternal.let { if (it === CLOSED) null else it as N? } + // Pointer to the previous node, updates in [remove] function. + private val _prev = atomic(prev) + + private val nextOrClosed get() = _next.value /** - * Returns the next segment or `null` of the one does not exists, + * Returns the next segment or `null` of the one does not exist, * and invokes [onClosedAction] if this segment is marked as closed. */ - inline fun nextOrIfClosed(onClosedAction: () -> Unit): N? = nextInternal.let { + @Suppress("UNCHECKED_CAST") + inline fun nextOrIfClosed(onClosedAction: () -> Nothing): N? = nextOrClosed.let { if (it === CLOSED) { onClosedAction() - null - } else it as N? + } else { + it as N? + } } + val next: N? get() = nextOrIfClosed { return null } + /** * Tries to set the next segment if it is not specified and this segment is not marked as closed. */ @@ -102,10 +120,8 @@ internal abstract class ConcurrentLinkedListNode /** * Checks whether this node is the physical tail of the current linked list. */ - val isTail: Boolean get() = next === null + val isTail: Boolean get() = next == null - // Pointer to the previous node, updates in [remove] function. - private val _prev = atomic(prev) val prev: N? get() = _prev.value /** @@ -141,7 +157,7 @@ internal abstract class ConcurrentLinkedListNode // Link `next` and `prev`. next._prev.value = prev if (prev !== null) prev._next.value = next - // Check that prev and next are still alive. + // Checks that prev and next are still alive. if (next.removed) continue if (prev !== null && prev.removed) continue // This node is removed. @@ -178,13 +194,13 @@ internal abstract class Segment>(val id: Long, prev: S?, pointers abstract val maxSlots: Int /** - * Numbers of cleaned slots (lowest bits) and AtomicRef pointers to this segment (highest bits) + * Numbers of cleaned slots (the lowest bits) and AtomicRef pointers to this segment (the highest bits) */ private val cleanedAndPointers = atomic(pointers shl POINTERS_SHIFT) /** - * The segment is considered as removed if all the slots are cleaned, - * there is no pointers to this segment from outside, and + * The segment is considered as removed if all the slots are cleaned. + * There are no pointers to this segment from outside, and * it is not a physical tail in the linked list of segments. */ override val removed get() = cleanedAndPointers.value == maxSlots && !isTail @@ -196,8 +212,7 @@ internal abstract class Segment>(val id: Long, prev: S?, pointers internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail /** - * This functions should be invoked on each slot clean-up; - * should not be invoked twice for the same slot. + * Invoked on each slot clean-up; should not be invoked twice for the same slot. */ fun onSlotCleaned() { if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove() @@ -212,8 +227,10 @@ private inline fun AtomicInt.addConditionally(delta: Int, condition: (cur: Int) } } +@Suppress("EXPERIMENTAL_FEATURE_WARNING") // We are using inline class only internally, so it is Ok internal inline class SegmentOrClosed>(private val value: Any?) { val isClosed: Boolean get() = value === CLOSED + @Suppress("UNCHECKED_CAST") val segment: S get() = if (value === CLOSED) error("Does not contain segment") else value as S } diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt index 6a3dce39ab..3d1305c682 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt @@ -1,3 +1,7 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + package kotlinx.coroutines.internal import kotlinx.atomicfu.* @@ -55,6 +59,7 @@ internal class SegmentBasedQueue { else el = s.element.value } if (el === BROKEN) continue + @Suppress("UNCHECKED_CAST") return el as T } } From 05adab2fdd200785383f022f870b988182d81bd0 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 10 Apr 2020 19:31:03 +0300 Subject: [PATCH 5/5] ~ fix import (again) --- .../common/src/internal/ConcurrentLinkedList.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index 2f3e140af5..128a1998ef 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -6,7 +6,7 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* -import kotlin.native.concurrent.* +import kotlin.native.concurrent.SharedImmutable /** * Returns the first segment `s` with `s.id >= id` or `CLOSED`