Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions library/std/src/sync/nonpoison/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ impl Condvar {
/// the predicate must always be checked each time this function returns to
/// protect against spurious wakeups.
///
/// # Panics
///
/// This function may [`panic!`] if it is used with more than one mutex
/// over time.
///
/// [`notify_one`]: Self::notify_one
/// [`notify_all`]: Self::notify_all
///
Expand Down
7 changes: 0 additions & 7 deletions library/std/src/sync/poison/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use crate::time::{Duration, Instant};
/// determining that a thread must block.
///
/// Functions in this module will block the current **thread** of execution.
/// Note that any attempt to use multiple mutexes on the same condition
/// variable may result in a runtime panic.
///
/// # Examples
///
Expand Down Expand Up @@ -85,11 +83,6 @@ impl Condvar {
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about [poisoning] on the [`Mutex`] type.
///
/// # Panics
///
/// This function may [`panic!`] if it is used with more than one mutex
/// over time.
///
/// [`notify_one`]: Self::notify_one
/// [`notify_all`]: Self::notify_all
/// [poisoning]: super::Mutex#poisoning
Expand Down
187 changes: 139 additions & 48 deletions library/std/src/sys/sync/condvar/pthread.rs
Original file line number Diff line number Diff line change
@@ -1,87 +1,178 @@
#![forbid(unsafe_op_in_unsafe_fn)]

use crate::mem::DropGuard;
use crate::pin::Pin;
use crate::ptr;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sync::atomic::{Atomic, AtomicUsize};
use crate::sys::pal::sync as pal;
use crate::sys::sync::{Mutex, OnceBox};
use crate::time::{Duration, Instant};

struct StateGuard<'a> {
mutex: Pin<&'a pal::Mutex>,
}

impl<'a> Drop for StateGuard<'a> {
fn drop(&mut self) {
unsafe { self.mutex.unlock() };
}
}

struct State {
mutex: pal::Mutex,
condvar: pal::Condvar,
}

impl State {
fn condvar(self: Pin<&Self>) -> Pin<&pal::Condvar> {
unsafe { self.map_unchecked(|this| &this.condvar) }
}

fn condvar_mut(self: Pin<&mut Self>) -> Pin<&mut pal::Condvar> {
unsafe { self.map_unchecked_mut(|this| &mut this.condvar) }
}

/// Locks the `mutex` field and returns a [`StateGuard`] that unlocks the
/// mutex when it is dropped.
///
/// # Safety
///
/// * The `mutex` field must not be locked by this thread.
/// * Dismissing the guard leads to undefined behaviour when this `State`
/// is dropped, as it is undefined behaviour to destroy a locked mutex.
unsafe fn lock(self: Pin<&Self>) -> StateGuard<'_> {
let mutex = unsafe { self.map_unchecked(|this| &this.mutex) };
unsafe { mutex.lock() };
StateGuard { mutex }
}
}

pub struct Condvar {
cvar: OnceBox<pal::Condvar>,
mutex: Atomic<usize>,
state: OnceBox<State>,
}

impl Condvar {
pub const fn new() -> Condvar {
Condvar { cvar: OnceBox::new(), mutex: AtomicUsize::new(0) }
Condvar { state: OnceBox::new() }
}

#[inline]
fn get(&self) -> Pin<&pal::Condvar> {
self.cvar.get_or_init(|| {
let mut cvar = Box::pin(pal::Condvar::new());
fn state(&self) -> Pin<&State> {
self.state.get_or_init(|| {
let mut state =
Box::pin(State { mutex: pal::Mutex::new(), condvar: pal::Condvar::new() });

// SAFETY: we only call `init` once per `pal::Condvar`, namely here.
unsafe { cvar.as_mut().init() };
cvar
unsafe { state.as_mut().condvar_mut().init() };
state
})
}

#[inline]
fn verify(&self, mutex: Pin<&pal::Mutex>) {
let addr = ptr::from_ref::<pal::Mutex>(&mutex).addr();
// Relaxed is okay here because we never read through `self.mutex`, and only use it to
// compare addresses.
match self.mutex.compare_exchange(0, addr, Relaxed, Relaxed) {
Ok(_) => {} // Stored the address
Err(n) if n == addr => {} // Lost a race to store the same address
_ => panic!("attempted to use a condition variable with two mutexes"),
}
}

#[inline]
pub fn notify_one(&self) {
let state = self.state();
// Notifications might be sent right after a mutex used with `wait` or
// `wait_timeout` is unlocked. Waiting until the state mutex is
// available ensures that the thread unlocking the mutex is enqueued
// on the inner condition variable, as the mutex is only unlocked
// with the state mutex held.
//
// Releasing the state mutex before issuing the notification stops
// the awakened threads from having to wait on this thread unlocking
// the mutex.
//
// SAFETY:
// The functions in this module are never called recursively, so the
// state mutex cannot be currently locked by this thread.
drop(unsafe { state.lock() });
// SAFETY: we called `init` above.
unsafe { self.get().notify_one() }
unsafe { state.condvar().notify_one() }
}

#[inline]
pub fn notify_all(&self) {
let state = self.state();
// Notifications might be sent right after a mutex used with `wait` or
// `wait_timeout` is unlocked. Waiting until the state mutex is
// available ensures that the thread unlocking the mutex is enqueued
// on the inner condition variable, as the mutex is only unlocked
// with the state mutex held.
//
// Releasing the state mutex before issuing the notification stops
// the awakened threads from having to wait on this thread unlocking
// the mutex.
//
// SAFETY:
// The functions in this module are never called recursively, so the
// state mutex cannot be currently locked by this thread.
drop(unsafe { state.lock() });
// SAFETY: we called `init` above.
unsafe { self.get().notify_all() }
unsafe { state.condvar().notify_all() }
}

#[inline]
pub unsafe fn wait(&self, mutex: &Mutex) {
// SAFETY: the caller guarantees that the lock is owned, thus the mutex
// must have been initialized already.
let mutex = unsafe { mutex.pal.get_unchecked() };
self.verify(mutex);
// SAFETY: we called `init` above, we verified that this condition
// variable is only used with `mutex` and the caller guarantees that
// `mutex` is locked by the current thread.
unsafe { self.get().wait(mutex) }
let state = self.state();

// Ensure that the mutex is locked when this function returns or panics.
// The relocking must occur after the state lock is unlocked to prevent
// deadlocks, hence we scope the relock guard before the state lock guard.
let relock;

// Lock the state mutex before unlocking `mutex` to ensure that
// notifications occurring before this thread is enqueued on the
// condvar are not missed.
//
// SAFETY:
// The functions in this module are never called recursively, so the
// state mutex cannot be currently locked by this thread.
let guard = unsafe { state.lock() };

// SAFETY:
// The caller must guarantee that `mutex` is currently locked by this
// thread.
unsafe { mutex.unlock() };
relock = DropGuard::new(mutex, |mutex| mutex.lock());

// SAFETY:
// * `init` was called above
// * the condition variable is only ever used with the state mutex
// * the state mutex was locked above
unsafe { state.condvar().wait(guard.mutex) };
}

pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
// SAFETY: the caller guarantees that the lock is owned, thus the mutex
// must have been initialized already.
let mutex = unsafe { mutex.pal.get_unchecked() };
self.verify(mutex);
let state = self.state();

// Ensure that the mutex is locked when this function returns or panics.
// The relocking must occur after the state lock is unlocked to prevent
// deadlocks, hence we scope the relock guard before the state lock guard.
let relock;

// Lock the state mutex before unlocking `mutex` to ensure that
// notifications occurring before this thread is enqueued on the
// condvar are not missed.
//
// SAFETY:
// The functions in this module are never called recursively, so the
// state mutex cannot be currently locked by this thread.
let guard = unsafe { state.lock() };

// SAFETY:
// The caller must guarantee that `mutex` is currently locked by this
// thread.
unsafe { mutex.unlock() };
relock = DropGuard::new(mutex, |mutex| mutex.lock());

if pal::Condvar::PRECISE_TIMEOUT {
// SAFETY: we called `init` above, we verified that this condition
// variable is only used with `mutex` and the caller guarantees that
// `mutex` is locked by the current thread.
unsafe { self.get().wait_timeout(mutex, dur) }
// SAFETY:
// * `init` was called above
// * the condition variable is only ever used with the state mutex
// * the state mutex was locked above
unsafe { state.condvar().wait_timeout(guard.mutex, dur) }
} else {
// Timeout reports are not reliable, so do the check ourselves.
let now = Instant::now();
// SAFETY: we called `init` above, we verified that this condition
// variable is only used with `mutex` and the caller guarantees that
// `mutex` is locked by the current thread.
let woken = unsafe { self.get().wait_timeout(mutex, dur) };
// SAFETY:
// * `init` was called above
// * the condition variable is only ever used with the state mutex
// * the state mutex was locked above
let woken = unsafe { state.condvar().wait_timeout(guard.mutex, dur) };
woken || now.elapsed() < dur
}
}
Expand Down
2 changes: 1 addition & 1 deletion library/std/src/sys/sync/mutex/pthread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sys::pal::sync as pal;
use crate::sys::sync::OnceBox;

pub struct Mutex {
pub(in crate::sys::sync) pal: OnceBox<pal::Mutex>,
pal: OnceBox<pal::Mutex>,
}

impl Mutex {
Expand Down
Loading