1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::{
AtomicU32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 5 states:
/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
/// Some thread has previously attempted to initialize the Once, but it panicked,
/// so the Once is now poisoned. There are no other threads currently accessing
/// this Once.
const POISONED: u32 = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
/// Some thread is currently attempting to run initialization and there are threads
/// waiting for it to finish.
const QUEUED: u32 = 3;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 4;
// Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.
pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
}
impl OnceState {
#[inline]
pub fn is_poisoned(&self) -> bool {
self.poisoned
}
#[inline]
pub fn poison(&self) {
self.set_state_to.set(POISONED);
}
}
struct CompletionGuard<'a> {
state: &'a AtomicU32,
set_state_on_drop_to: u32,
}
impl<'a> Drop for CompletionGuard<'a> {
fn drop(&mut self) {
// Use release ordering to propagate changes to all threads checking
// up on the Once. `futex_wake_all` does its own synchronization, hence
// we do not need `AcqRel`.
if self.state.swap(self.set_state_on_drop_to, Release) == QUEUED {
futex_wake_all(&self.state);
}
}
}
pub struct Once {
state: AtomicU32,
}
impl Once {
#[inline]
pub const fn new() -> Once {
Once { state: AtomicU32::new(INCOMPLETE) }
}
#[inline]
pub fn is_completed(&self) -> bool {
// Use acquire ordering to make all initialization changes visible to the
// current thread.
self.state.load(Acquire) == COMPLETE
}
#[inline]
pub(crate) fn state(&mut self) -> ExclusiveState {
match *self.state.get_mut() {
INCOMPLETE => ExclusiveState::Incomplete,
POISONED => ExclusiveState::Poisoned,
COMPLETE => ExclusiveState::Complete,
_ => unreachable!("invalid Once state"),
}
}
// This uses FnMut to match the API of the generic implementation. As this
// implementation is quite light-weight, it is generic over the closure and
// so avoids the cost of dynamic dispatch.
#[cold]
#[track_caller]
pub fn call(&self, ignore_poisoning: bool, f: &mut impl FnMut(&public::OnceState)) {
let mut state = self.state.load(Acquire);
loop {
match state {
POISONED if !ignore_poisoning => {
// Panic to propagate the poison.
panic!("Once instance has previously been poisoned");
}
INCOMPLETE | POISONED => {
// Try to register the current thread as the one running.
if let Err(new) =
self.state.compare_exchange_weak(state, RUNNING, Acquire, Acquire)
{
state = new;
continue;
}
// `waiter_queue` will manage other waiting threads, and
// wake them up on drop.
let mut waiter_queue =
CompletionGuard { state: &self.state, set_state_on_drop_to: POISONED };
// Run the function, letting it know if we're poisoned or not.
let f_state = public::OnceState {
inner: OnceState {
poisoned: state == POISONED,
set_state_to: Cell::new(COMPLETE),
},
};
f(&f_state);
waiter_queue.set_state_on_drop_to = f_state.inner.set_state_to.get();
return;
}
RUNNING | QUEUED => {
// Set the state to QUEUED if it is not already.
if state == RUNNING
&& let Err(new) =
self.state.compare_exchange_weak(RUNNING, QUEUED, Relaxed, Acquire)
{
state = new;
continue;
}
futex_wait(&self.state, QUEUED, None);
state = self.state.load(Acquire);
}
COMPLETE => return,
_ => unreachable!("state is never set to invalid values"),
}
}
}
}