Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread park/unpark synchronization #54174

Merged
merged 3 commits into from
Sep 19, 2018
Merged

Fix thread park/unpark synchronization #54174

merged 3 commits into from
Sep 19, 2018

Conversation

parched
Copy link
Contributor

@parched parched commented Sep 13, 2018

Previously the code below would not be guaranteed to exit when the
second unpark took the return, // already unparked path because there
was no write to synchronize with a read in park.

EDIT: doesn't actually require third thread

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{current, spawn, park};

static FLAG: AtomicBool = AtomicBool::new(false);

fn main() {
    let thread_0 = current();
    spawn(move || {
        thread_0.unpark();
        FLAG.store(true, Ordering::Relaxed);
        thread_0.unpark();
    });

    while !FLAG.load(Ordering::Relaxed) {
        park();
    }
}

I have some other ideas on how to improve the performance of park and unpark using fences, avoiding any atomic RMW when the state is already NOTIFIED, and also how to avoid calling notify_one without the mutex locked. But I need to write some micro benchmarks first, so I'll submit those changes at a later date if they prove to be faster.

Fixes #53366 I hope.

Previously the code below would not be guaranteed to exit when the first
spawned thread took the `return, // already unparked` path because there
was no write to synchronize with a read in `park`.

```
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{current, spawn, park};

static FLAG: AtomicBool = AtomicBool::new(false);

fn main() {
    let thread_0 = current();
    spawn(move || {
        FLAG.store(true, Ordering::Relaxed);
        thread_0.unpark();
    });

    let thread_0 = current();
    spawn(move || {
        thread_0.unpark();
    });

    while !FLAG.load(Ordering::Relaxed) {
        park();
    }
}
```
@@ -800,7 +800,7 @@ pub fn park() {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
thread.inner.state.swap(EMPTY, SeqCst);
Copy link
Contributor

@willmo willmo Sep 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this should be a .compare_exchange().unwrap(), just to be more explicit and make some use of the read value (thus possibly reducing the chance of somebody changing this back to .store() in the future). This shouldn't be a common case so I don't imagine the performance impact would be very noticeable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, I will add a comment and an assertion.

@rust-highfive
Copy link
Collaborator

The job x86_64-gnu-llvm-5.0 of your PR failed on Travis (raw log). Through arcane magic we have determined that the following fragments from the build log may contain information about the problem.

Click to expand the log.
[00:48:24]    Compiling diff v0.1.11
[00:48:24]    Compiling filetime v0.2.1
[00:48:24]    Compiling env_logger v0.5.12
[00:48:27]    Compiling syn v0.14.9
The job exceeded the maximum time limit for jobs, and has been terminated.

I'm a bot! I can only do what humans tell me to, so if this was not helpful or you have suggestions for improvements, please ping or otherwise contact @TimNN. (Feature Requests)

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct to me. 👍

I only wish there were a few short comments explaining what's going on :)

Err(EMPTY) => {} // parked thread went away, try again
_ => panic!("inconsistent state in unpark"),
}
match self.inner.state.swap(NOTIFIED, SeqCst) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth adding a comment saying that even if the thread is already notified, we still want to do a write with Release semantics (at the very least).

@@ -889,7 +889,7 @@ pub fn park_timeout(dur: Duration) {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
thread.inner.state.swap(EMPTY, SeqCst);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment that why swap rather than store? It's possible that the state goes to NOTIFIED just before this line and we need to acknowledge all notifications with an Acquire load (at the very least).

@ghost
Copy link

ghost commented Sep 14, 2018

r? @alexcrichton

@alexcrichton
Copy link
Member

Thanks for the PR! I agree with @stjepang that some comments would definitely be nice to have here. I haven't been following the discussion myself but on reading this I'm not really sure why it's required. Some comments could likely enlighten me though!

regarding the synchronization.
@parched
Copy link
Contributor Author

parched commented Sep 14, 2018

@alexcrichton @stjepang Yes, sorry, I should have added some explanation, does it look alright now?

thread.inner.state.store(EMPTY, SeqCst);
// We must read again here, even though we know it will be NOTIFY,
// to synchronize with an write in `unpark` that occurred since we
// last read.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I understand this, NOTIFY should be NOTIFIED and we're not actually synchronizing with anything in unpark but rather a write that happened before unpark?

Put another way lots of threads could be unparking this thread and wanting to send memory to it. We saw some threads via the compare_exchange above, but we're not guaranteed to see any other threads between that and the store of EMPTY. All the other threads in this small window are not notifying us, however. So we use swap here to make sure to maintain the invariant that we've synchronized with all threads who didn't notify us or the thread will notify us?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. So basically we want to establish acquire-release relationship with every unpark() that happened before this park().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexcrichton Yes, I will rewrite the comment to make that clearer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one thing I'm not entirely sure of, doesn't compare_exchange already do the synchronization for us? When we come out the Err branch haven't we synchronized with all threads that already wrote NOTIFIED because we're for sure seeing NOTIFIED?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compare_exchange does the synchronization for us, but it's not enough.

This is the crucial part you might be overlooking: In unpark now we unconditionally write NOTIFIED (using the swap operation) every time. Suppose that a thread calls unpark just after our compare_exchange was executed.

When we clear the notification signal (go from NOTIFIED to EMPTY), we want to synchronize with all threads that have delivered a notification so far. If we don't do a swap operation to clear notifications, we will not synchronize with the thread that called unpark just after our compare_exchange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll make the comment clearer like "since we last read NOTIFIED in the compare_exchange above".

This is quite interesting because it touches on some of the discussion in rust-lang/rfcs#2503 in that, the semantics we really want here are store-acquire or a store-load barrier which aren't currently expressible in Rust (C11) atomics without doing a write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok that definitely makes sense to me, thanks @stjepang!

thread.inner.state.store(EMPTY, SeqCst);
// We must read again here, even though we know it will be NOTIFY,
// to synchronize with an write in `unpark` that occurred since we
// last read.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the same as the above, could this just reference the above comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will do

_ => panic!("inconsistent state in unpark"),
}
// We must unconditionally write NOTIFIED here to
// synchronize with a read in `park`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I don't quite fully understand this comment and why the loop isn't needed any more. Can you expand a bit on why this unconditional write is needed? Also is this a case like above where we're not actually synchronizing with park at all but rather we're synchronizing with reads after park returns?

Copy link

@ghost ghost Sep 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In unpark(), we want to do some kind of Release write in order to "publish" the current thread's memory, even if the other thread has already been notified.

Later on, a call to park() will do an Acquire load and be able to see that "published" memory. If we skipped the Release write in unpark() then the Acquire load wouldn't be able to see it.

Put differently, whenever a park() clears the NOTIFIED flag, we want that to happen-before (Acquire-Release) every unpark() that set the NOTIFIED flag. The point is that unpark() must set the flag to NOTIFIED even if it is already set to NOTIFIED. :)

Does that make any sense?

Copy link
Contributor Author

@parched parched Sep 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop isn't needed anymore because it is just doing a swap instead of a CAS, so there is no failure/retry case anymore. We are doing a swap because each unpark must write a value that a park reads to ensure the happens-before relationship for any writes before unpark with any reads after park returns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks for the info! This was also suspect to me because the mutex isn't protecting anything in this code path, but I think it's ok because the atomic comparison above is done inside of the mutex.

@parched
Copy link
Contributor Author

parched commented Sep 14, 2018

As a sanity check I tried the old and new versions with relacy race detector and this does indeed fix the deadlock. I'll add some more assertions to make sure I haven't broken any other properties.

@parched
Copy link
Contributor Author

parched commented Sep 15, 2018

I should add that the docs for park/unpark don't specifically say that each unpark will synchronize with a park, but I believe this was the intended behaviour and I think there is code that relies on it, e.g. tokio?

@alexcrichton
Copy link
Member

@bors: r+

This all looks great to me, thanks so much!

@bors
Copy link
Contributor

bors commented Sep 18, 2018

📌 Commit a3b8705 has been approved by alexcrichton

@bors bors added the S-waiting-on-bors Status: Waiting on bors to run and complete tests. Bors will change the label on completion. label Sep 18, 2018
@RalfJung
Copy link
Member

I agree this fixes the example.

I wonder: Now that unpark doesn't really do anything when holding the lock, why does park acquire the lock before looking at state? Couldn't it wait to acquire the lock until it knows it definitely has to sleep?

@alexcrichton
Copy link
Member

alexcrichton commented Sep 18, 2018

@RalfJung ah I was wondering the same but if the lock happened just before the sleep then it would be possible for the call to unpark() to send a wakeup notification but the park() thread never receives it because it blocks after the condvar wakeup is received. I think we need it synchronized in park() to atomically make a decision to block and then block.

@parched
Copy link
Contributor Author

parched commented Sep 19, 2018

Yes, as alex says, we need the lock covering the time between the last CAS and the wait so the notification doesn't occur before we go to sleep. A slightly different way of doing it is like #53366 (comment) but there's no advantage doing it that way. Both have the early out CAS before taking the lock, so are almost certainly going to sleep when the lock is taken (ignoring the very small chance that an unpark happens at the same time).

Something I've just realised though is, unpark doesn't actually need to hold the lock when doing the notify. It just needs to lock it and then unlock before calling notify so that it knows the parked thread is waiting. The advantage this has is when the waiting thread wakes it isn't intermediately blocked on the mutex again (see these notes).

@RalfJung
Copy link
Member

Wait, but if unpark does not acquire the lock at all, then the lock in park must be useless because it is never going to be held by any other thread...

@RalfJung
Copy link
Member

RalfJung commented Sep 19, 2018

then it would be possible for the call to unpark() to send a wakeup notification but the park() thread never receives it

Ah I see... so what must not happen is that

  • Thread A sets its state from EMPTY to PARKED.
  • Thread B changes state from PARKED to NOTIFIED, acquires the lock, wakes up thread A (NOP because the thread does not sleep yet), and returns.
  • Thread A acquires the lock and goes to sleep. Indefinitely.

So this means that (a) A needs to acquire the lock before setting the state to PARKED, and (b) B needs to at least acquire the lock (and release it again immediately?) between changing the state away from PARKED and waking up A. The purpose of this is to make sure that the thread we are unparking is either before or after the critical section in its park(), but NOT between the CAS and the wait.

I think this is worth explaining in more detail in comments in the code.

@parched
Copy link
Contributor Author

parched commented Sep 19, 2018

Yes, exactly. I will add some comments explaining that too.

@bors
Copy link
Contributor

bors commented Sep 19, 2018

⌛ Testing commit a3b8705 with merge 20dc0c5...

bors added a commit that referenced this pull request Sep 19, 2018
Fix `thread` `park`/`unpark` synchronization

Previously the code below would not be guaranteed to exit when the
second unpark took the `return, // already unparked` path because there
was no write to synchronize with a read in `park`.

EDIT: doesn't actually require third thread
```
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{current, spawn, park};

static FLAG: AtomicBool = AtomicBool::new(false);

fn main() {
    let thread_0 = current();
    spawn(move || {
        thread_0.unpark();
        FLAG.store(true, Ordering::Relaxed);
        thread_0.unpark();
    });

    while !FLAG.load(Ordering::Relaxed) {
        park();
    }
}
```

I have some other ideas on how to improve the performance of `park` and `unpark` using fences, avoiding any atomic RMW when the state is already `NOTIFIED`, and also how to avoid calling `notify_one` without the mutex locked. But I need to write some micro benchmarks first, so I'll submit those changes at a later date if they prove to be faster.

Fixes #53366 I hope.
@bors
Copy link
Contributor

bors commented Sep 19, 2018

☀️ Test successful - status-appveyor, status-travis
Approved by: alexcrichton
Pushing 20dc0c5 to master...

@bors bors merged commit a3b8705 into rust-lang:master Sep 19, 2018
@RalfJung
Copy link
Member

I will add some comments explaining that too.

Too late.^^

Feel free to make a new PR though, and r? me.

@parched
Copy link
Contributor Author

parched commented Sep 20, 2018

Feel free to make a new PR though, and r? me.

Ok, will do when I get the chance, maybe not till early next week though. I'll also add the optimisation to unlock the mutex immediately before notify, unless you would prefer to leave it as is?

@RalfJung
Copy link
Member

That optimization seems right to me. If you are sure as well and maybe we'll find a third person to ack, then yeah let's do that as well.

bors added a commit that referenced this pull request Oct 31, 2018
thread::unpark: Avoid notifying with mutex locked.

This means when the other thread wakes it can continue right away
instead of having to wait for the mutex.

Also add some comments explaining why the mutex needs to be locked in
the first place.

This is a follow up to #54174
I did some tests with relacy [here](https://gist.github.com/parched/b7fb88c97755a81e5cb9f9048a15f7fb) (This PR is InnerV2). If anyone can think of some other test case worth adding let me know.

r? @RalfJung
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
S-waiting-on-bors Status: Waiting on bors to run and complete tests. Bors will change the label on completion.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants