-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix thread
park
/unpark
synchronization
#54174
Conversation
Previously the code below would not be guaranteed to exit when the first spawned thread took the `return, // already unparked` path because there was no write to synchronize with a read in `park`. ``` use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{current, spawn, park}; static FLAG: AtomicBool = AtomicBool::new(false); fn main() { let thread_0 = current(); spawn(move || { FLAG.store(true, Ordering::Relaxed); thread_0.unpark(); }); let thread_0 = current(); spawn(move || { thread_0.unpark(); }); while !FLAG.load(Ordering::Relaxed) { park(); } } ```
src/libstd/thread/mod.rs
Outdated
@@ -800,7 +800,7 @@ pub fn park() { | |||
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { | |||
Ok(_) => {} | |||
Err(NOTIFIED) => { | |||
thread.inner.state.store(EMPTY, SeqCst); | |||
thread.inner.state.swap(EMPTY, SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if this should be a .compare_exchange().unwrap(), just to be more explicit and make some use of the read value (thus possibly reducing the chance of somebody changing this back to .store() in the future). This shouldn't be a common case so I don't imagine the performance impact would be very noticeable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree, I will add a comment and an assertion.
The job Click to expand the log.
I'm a bot! I can only do what humans tell me to, so if this was not helpful or you have suggestions for improvements, please ping or otherwise contact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks correct to me. 👍
I only wish there were a few short comments explaining what's going on :)
src/libstd/thread/mod.rs
Outdated
Err(EMPTY) => {} // parked thread went away, try again | ||
_ => panic!("inconsistent state in unpark"), | ||
} | ||
match self.inner.state.swap(NOTIFIED, SeqCst) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth adding a comment saying that even if the thread is already notified, we still want to do a write with Release
semantics (at the very least).
src/libstd/thread/mod.rs
Outdated
@@ -889,7 +889,7 @@ pub fn park_timeout(dur: Duration) { | |||
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { | |||
Ok(_) => {} | |||
Err(NOTIFIED) => { | |||
thread.inner.state.store(EMPTY, SeqCst); | |||
thread.inner.state.swap(EMPTY, SeqCst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment that why swap
rather than store
? It's possible that the state goes to NOTIFIED
just before this line and we need to acknowledge all notifications with an Acquire
load (at the very least).
Thanks for the PR! I agree with @stjepang that some comments would definitely be nice to have here. I haven't been following the discussion myself but on reading this I'm not really sure why it's required. Some comments could likely enlighten me though! |
regarding the synchronization.
@alexcrichton @stjepang Yes, sorry, I should have added some explanation, does it look alright now? |
src/libstd/thread/mod.rs
Outdated
thread.inner.state.store(EMPTY, SeqCst); | ||
// We must read again here, even though we know it will be NOTIFY, | ||
// to synchronize with an write in `unpark` that occurred since we | ||
// last read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand this, NOTIFY
should be NOTIFIED
and we're not actually synchronizing with anything in unpark
but rather a write that happened before unpark
?
Put another way lots of threads could be unparking this thread and wanting to send memory to it. We saw some threads via the compare_exchange
above, but we're not guaranteed to see any other threads between that and the store of EMPTY
. All the other threads in this small window are not notifying us, however. So we use swap
here to make sure to maintain the invariant that we've synchronized with all threads who didn't notify us or the thread will notify us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. So basically we want to establish acquire-release relationship with every unpark()
that happened before this park()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexcrichton Yes, I will rewrite the comment to make that clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess one thing I'm not entirely sure of, doesn't compare_exchange
already do the synchronization for us? When we come out the Err
branch haven't we synchronized with all threads that already wrote NOTIFIED
because we're for sure seeing NOTIFIED
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compare_exchange
does the synchronization for us, but it's not enough.
This is the crucial part you might be overlooking: In unpark
now we unconditionally write NOTIFIED
(using the swap
operation) every time. Suppose that a thread calls unpark
just after our compare_exchange
was executed.
When we clear the notification signal (go from NOTIFIED
to EMPTY
), we want to synchronize with all threads that have delivered a notification so far. If we don't do a swap
operation to clear notifications, we will not synchronize with the thread that called unpark
just after our compare_exchange
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll make the comment clearer like "since we last read NOTIFIED in the compare_exchange above".
This is quite interesting because it touches on some of the discussion in rust-lang/rfcs#2503 in that, the semantics we really want here are store-acquire or a store-load barrier which aren't currently expressible in Rust (C11) atomics without doing a write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok that definitely makes sense to me, thanks @stjepang!
src/libstd/thread/mod.rs
Outdated
thread.inner.state.store(EMPTY, SeqCst); | ||
// We must read again here, even though we know it will be NOTIFY, | ||
// to synchronize with an write in `unpark` that occurred since we | ||
// last read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the same as the above, could this just reference the above comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will do
src/libstd/thread/mod.rs
Outdated
_ => panic!("inconsistent state in unpark"), | ||
} | ||
// We must unconditionally write NOTIFIED here to | ||
// synchronize with a read in `park`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I don't quite fully understand this comment and why the loop isn't needed any more. Can you expand a bit on why this unconditional write is needed? Also is this a case like above where we're not actually synchronizing with park
at all but rather we're synchronizing with reads after park
returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In unpark()
, we want to do some kind of Release
write in order to "publish" the current thread's memory, even if the other thread has already been notified.
Later on, a call to park()
will do an Acquire
load and be able to see that "published" memory. If we skipped the Release
write in unpark()
then the Acquire
load wouldn't be able to see it.
Put differently, whenever a park()
clears the NOTIFIED
flag, we want that to happen-before (Acquire
-Release
) every unpark()
that set the NOTIFIED
flag. The point is that unpark()
must set the flag to NOTIFIED
even if it is already set to NOTIFIED
. :)
Does that make any sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop isn't needed anymore because it is just doing a swap instead of a CAS, so there is no failure/retry case anymore. We are doing a swap because each unpark
must write a value that a park
reads to ensure the happens-before relationship for any writes before unpark
with any reads after park
returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thanks for the info! This was also suspect to me because the mutex isn't protecting anything in this code path, but I think it's ok because the atomic comparison above is done inside of the mutex.
As a sanity check I tried the old and new versions with relacy race detector and this does indeed fix the deadlock. I'll add some more assertions to make sure I haven't broken any other properties. |
I should add that the docs for |
@bors: r+ This all looks great to me, thanks so much! |
📌 Commit a3b8705 has been approved by |
I agree this fixes the example. I wonder: Now that |
@RalfJung ah I was wondering the same but if the lock happened just before the sleep then it would be possible for the call to |
Yes, as alex says, we need the lock covering the time between the last CAS and the Something I've just realised though is, |
Wait, but if |
Ah I see... so what must not happen is that
So this means that (a) A needs to acquire the lock before setting the state to I think this is worth explaining in more detail in comments in the code. |
Yes, exactly. I will add some comments explaining that too. |
Fix `thread` `park`/`unpark` synchronization Previously the code below would not be guaranteed to exit when the second unpark took the `return, // already unparked` path because there was no write to synchronize with a read in `park`. EDIT: doesn't actually require third thread ``` use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{current, spawn, park}; static FLAG: AtomicBool = AtomicBool::new(false); fn main() { let thread_0 = current(); spawn(move || { thread_0.unpark(); FLAG.store(true, Ordering::Relaxed); thread_0.unpark(); }); while !FLAG.load(Ordering::Relaxed) { park(); } } ``` I have some other ideas on how to improve the performance of `park` and `unpark` using fences, avoiding any atomic RMW when the state is already `NOTIFIED`, and also how to avoid calling `notify_one` without the mutex locked. But I need to write some micro benchmarks first, so I'll submit those changes at a later date if they prove to be faster. Fixes #53366 I hope.
☀️ Test successful - status-appveyor, status-travis |
Too late.^^ Feel free to make a new PR though, and r? me. |
Ok, will do when I get the chance, maybe not till early next week though. I'll also add the optimisation to unlock the mutex immediately before notify, unless you would prefer to leave it as is? |
That optimization seems right to me. If you are sure as well and maybe we'll find a third person to ack, then yeah let's do that as well. |
thread::unpark: Avoid notifying with mutex locked. This means when the other thread wakes it can continue right away instead of having to wait for the mutex. Also add some comments explaining why the mutex needs to be locked in the first place. This is a follow up to #54174 I did some tests with relacy [here](https://gist.github.com/parched/b7fb88c97755a81e5cb9f9048a15f7fb) (This PR is InnerV2). If anyone can think of some other test case worth adding let me know. r? @RalfJung
Previously the code below would not be guaranteed to exit when the
second unpark took the
return, // already unparked
path because therewas no write to synchronize with a read in
park
.EDIT: doesn't actually require third thread
I have some other ideas on how to improve the performance of
park
andunpark
using fences, avoiding any atomic RMW when the state is alreadyNOTIFIED
, and also how to avoid callingnotify_one
without the mutex locked. But I need to write some micro benchmarks first, so I'll submit those changes at a later date if they prove to be faster.Fixes #53366 I hope.