Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No initial emit under locks in share(replay: 1, scope: .whileConnected) #2654

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
12 changes: 7 additions & 5 deletions RxSwift/Observables/Multicast.swift
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,15 @@ final private class ConnectableObservableAdapter<Subject: SubjectType>
}

private var lazySubject: Subject {
if let subject = self.subject {
lock.performLocked {
if let subject = self.subject {
return subject
}

let subject = self.makeSubject()
self.subject = subject
return subject
}

let subject = self.makeSubject()
self.subject = subject
return subject
}

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
Expand Down
26 changes: 10 additions & 16 deletions RxSwift/Observables/ShareReplayScope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private final class ShareReplay1WhileConnectedConnection<Element>
private let lock: RecursiveLock
private var disposed: Bool = false
fileprivate var observers = Observers()
private var element: Element?
fileprivate var element: Element?

init(parent: Parent, lock: RecursiveLock) {
self.parent = parent
Expand Down Expand Up @@ -205,18 +205,6 @@ private final class ShareReplay1WhileConnectedConnection<Element>
self.subscription.setDisposable(self.parent.source.subscribe(self))
}

final func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.performLocked {
if let element = self.element {
observer.on(.next(element))
}

let disposeKey = self.observers.insert(observer.on)

return SubscriptionDisposable(owner: self, key: disposeKey)
}
}

final private func synchronized_dispose() {
self.disposed = true
if self.parent.connection === self {
Expand Down Expand Up @@ -274,14 +262,20 @@ final private class ShareReplay1WhileConnected<Element>
let connection = self.synchronized_subscribe(observer)
let count = connection.observers.count

let disposable = connection.synchronized_subscribe(observer)
let disposeKey = connection.observers.insert(observer.on)

let initialValueToReplay = connection.element
self.lock.unlock()

if let initialValueToReplay {
observer.on(.next(initialValueToReplay))
}

if count == 0 {
connection.connect()
}

return disposable
return SubscriptionDisposable(owner: connection, key: disposeKey)
}

@inline(__always)
Expand Down Expand Up @@ -414,8 +408,8 @@ final private class ShareWhileConnected<Element>
let connection = self.synchronized_subscribe(observer)
let count = connection.observers.count

let disposable = connection.synchronized_subscribe(observer)
self.lock.unlock()
let disposable = connection.synchronized_subscribe(observer)

if count == 0 {
connection.connect()
Expand Down
84 changes: 84 additions & 0 deletions Tests/RxSwiftTests/Anomalies.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,88 @@ extension AnomaliesTest {
performSharingOperatorsTest(share: op)
}
}

func test2653ShareReplayOneInitialEmissionDeadlock() {
let immediatelyEmittingSource = Observable<Void>.create { observer in
observer.on(.next(()))
return Disposables.create()
}
.share(replay: 1, scope: .whileConnected)

let exp = createInitialEmissionsDeadlockExpectation(
sourceName: "`share(replay: 1, scope: .whileConnected)`",
immediatelyEmittingSource: immediatelyEmittingSource
)

wait(for: [exp], timeout: 5)
}

func test2653ShareReplayMoreInitialEmissionDeadlock() {
let immediatelyEmittingSource = Observable<Void>.create { observer in
observer.on(.next(()))
return Disposables.create()
}
.share(replay: 2, scope: .whileConnected)

let exp = createInitialEmissionsDeadlockExpectation(
sourceName: "`share(replay: 2, scope: .whileConnected)`",
immediatelyEmittingSource: immediatelyEmittingSource
)

wait(for: [exp], timeout: 5)
}

func test2653ShareReplayOneForeverInitialEmissionDeadlock() {
let immediatelyEmittingSource = Observable<Void>.create { observer in
observer.on(.next(()))
return Disposables.create()
}
.share(replay: 1, scope: .forever)

let exp = createInitialEmissionsDeadlockExpectation(
sourceName: "`share(replay: 1, scope: .forever)`",
immediatelyEmittingSource: immediatelyEmittingSource
)

wait(for: [exp], timeout: 5)
}

func test2653ShareReplayMoreForeverInitialEmissionDeadlock() {
let immediatelyEmittingSource = Observable<Void>.create { observer in
observer.on(.next(()))
return Disposables.create()
}
.share(replay: 2, scope: .forever)

let exp = createInitialEmissionsDeadlockExpectation(
sourceName: "`share(replay: 2, scope: .forever)`",
immediatelyEmittingSource: immediatelyEmittingSource
)

wait(for: [exp], timeout: 5)
}

private func createInitialEmissionsDeadlockExpectation(
sourceName: String,
immediatelyEmittingSource: Observable<Void>
) -> XCTestExpectation {
let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it doesn't keep its lock acquired to replay values upon subscription")

let triggerRange = 0..<1000

let multipleSubscriptions = Observable.zip(triggerRange.map { _ in
Observable.just(())
.observe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.flatMap { _ in
immediatelyEmittingSource
}
.take(1)
})

_ = multipleSubscriptions.subscribe(onCompleted: {
exp.fulfill()
})

return exp
}
}