aboutsummaryrefslogtreecommitdiff
path: root/src/latch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/latch.rs')
-rw-r--r--src/latch.rs171
1 files changed, 109 insertions, 62 deletions
diff --git a/src/latch.rs b/src/latch.rs
index de43272..6c2e4fe 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -1,10 +1,11 @@
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Condvar, Mutex};
+use std::sync::Arc;
use std::usize;
use crate::registry::{Registry, WorkerThread};
+use crate::sync::{Condvar, Mutex};
/// We define various kinds of latches, which are all a primitive signaling
/// mechanism. A latch starts as false. Eventually someone calls `set()` and
@@ -84,13 +85,6 @@ impl CoreLatch {
}
}
- /// Returns the address of this core latch as an integer. Used
- /// for logging.
- #[inline]
- pub(super) fn addr(&self) -> usize {
- self as *const CoreLatch as usize
- }
-
/// Invoked by owning thread as it prepares to sleep. Returns true
/// if the owning thread may proceed to fall asleep, false if the
/// latch was set in the meantime.
@@ -142,6 +136,13 @@ impl CoreLatch {
}
}
+impl AsCoreLatch for CoreLatch {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ self
+ }
+}
+
/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
@@ -269,62 +270,32 @@ impl Latch for LockLatch {
}
}
-/// Counting latches are used to implement scopes. They track a
-/// counter. Unlike other latches, calling `set()` does not
-/// necessarily make the latch be considered `set()`; instead, it just
-/// decrements the counter. The latch is only "set" (in the sense that
-/// `probe()` returns true) once the counter reaches zero.
+/// Once latches are used to implement one-time blocking, primarily
+/// for the termination flag of the threads in the pool.
///
-/// Note: like a `SpinLatch`, count laches are always associated with
+/// Note: like a `SpinLatch`, once-latches are always associated with
/// some registry that is probing them, which must be tickled when
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
/// reference to that registry. This is because in some cases the
-/// registry owns the count-latch, and that would create a cycle. So a
-/// `CountLatch` must be given a reference to its owning registry when
+/// registry owns the once-latch, and that would create a cycle. So a
+/// `OnceLatch` must be given a reference to its owning registry when
/// it is set. For this reason, it does not implement the `Latch`
/// trait (but it doesn't have to, as it is not used in those generic
/// contexts).
#[derive(Debug)]
-pub(super) struct CountLatch {
+pub(super) struct OnceLatch {
core_latch: CoreLatch,
- counter: AtomicUsize,
}
-impl CountLatch {
- #[inline]
- pub(super) fn new() -> CountLatch {
- Self::with_count(1)
- }
-
+impl OnceLatch {
#[inline]
- pub(super) fn with_count(n: usize) -> CountLatch {
- CountLatch {
+ pub(super) fn new() -> OnceLatch {
+ Self {
core_latch: CoreLatch::new(),
- counter: AtomicUsize::new(n),
- }
- }
-
- #[inline]
- pub(super) fn increment(&self) {
- debug_assert!(!self.core_latch.probe());
- self.counter.fetch_add(1, Ordering::Relaxed);
- }
-
- /// Decrements the latch counter by one. If this is the final
- /// count, then the latch is **set**, and calls to `probe()` will
- /// return true. Returns whether the latch was set.
- #[inline]
- pub(super) unsafe fn set(this: *const Self) -> bool {
- if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- CoreLatch::set(&(*this).core_latch);
- true
- } else {
- false
}
}
- /// Decrements the latch counter by one and possibly set it. If
- /// the latch is set, then the specific worker thread is tickled,
+ /// Set the latch, then tickle the specific worker thread,
/// which should be the one that owns this latch.
#[inline]
pub(super) unsafe fn set_and_tickle_one(
@@ -332,31 +303,81 @@ impl CountLatch {
registry: &Registry,
target_worker_index: usize,
) {
- if Self::set(this) {
+ if CoreLatch::set(&(*this).core_latch) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}
-impl AsCoreLatch for CountLatch {
+impl AsCoreLatch for OnceLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}
+/// Counting latches are used to implement scopes. They track a
+/// counter. Unlike other latches, calling `set()` does not
+/// necessarily make the latch be considered `set()`; instead, it just
+/// decrements the counter. The latch is only "set" (in the sense that
+/// `probe()` returns true) once the counter reaches zero.
#[derive(Debug)]
-pub(super) struct CountLockLatch {
- lock_latch: LockLatch,
+pub(super) struct CountLatch {
counter: AtomicUsize,
+ kind: CountLatchKind,
}
-impl CountLockLatch {
- #[inline]
- pub(super) fn with_count(n: usize) -> CountLockLatch {
- CountLockLatch {
- lock_latch: LockLatch::new(),
- counter: AtomicUsize::new(n),
+enum CountLatchKind {
+ /// A latch for scopes created on a rayon thread which will participate in work-
+ /// stealing while it waits for completion. This thread is not necessarily part
+ /// of the same registry as the scope itself!
+ Stealing {
+ latch: CoreLatch,
+ /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
+ /// with registry B, when a job completes in a thread of registry B, we may
+ /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
+ /// That means we need a reference to registry A (since at that point we will
+ /// only have a reference to registry B), so we stash it here.
+ registry: Arc<Registry>,
+ /// The index of the worker to wake in `registry`
+ worker_index: usize,
+ },
+
+ /// A latch for scopes created on a non-rayon thread which will block to wait.
+ Blocking { latch: LockLatch },
+}
+
+impl std::fmt::Debug for CountLatchKind {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ CountLatchKind::Stealing { latch, .. } => {
+ f.debug_tuple("Stealing").field(latch).finish()
+ }
+ CountLatchKind::Blocking { latch, .. } => {
+ f.debug_tuple("Blocking").field(latch).finish()
+ }
+ }
+ }
+}
+
+impl CountLatch {
+ pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
+ Self::with_count(1, owner)
+ }
+
+ pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
+ Self {
+ counter: AtomicUsize::new(count),
+ kind: match owner {
+ Some(owner) => CountLatchKind::Stealing {
+ latch: CoreLatch::new(),
+ registry: Arc::clone(owner.registry()),
+ worker_index: owner.index(),
+ },
+ None => CountLatchKind::Blocking {
+ latch: LockLatch::new(),
+ },
+ },
}
}
@@ -366,16 +387,42 @@ impl CountLockLatch {
debug_assert!(old_counter != 0);
}
- pub(super) fn wait(&self) {
- self.lock_latch.wait();
+ pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
+ match &self.kind {
+ CountLatchKind::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => unsafe {
+ let owner = owner.expect("owner thread");
+ debug_assert_eq!(registry.id(), owner.registry().id());
+ debug_assert_eq!(*worker_index, owner.index());
+ owner.wait_until(latch);
+ },
+ CountLatchKind::Blocking { latch } => latch.wait(),
+ }
}
}
-impl Latch for CountLockLatch {
+impl Latch for CountLatch {
#[inline]
unsafe fn set(this: *const Self) {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
- LockLatch::set(&(*this).lock_latch);
+ // NOTE: Once we call `set` on the internal `latch`,
+ // the target may proceed and invalidate `this`!
+ match (*this).kind {
+ CountLatchKind::Stealing {
+ ref latch,
+ ref registry,
+ worker_index,
+ } => {
+ let registry = Arc::clone(registry);
+ if CoreLatch::set(latch) {
+ registry.notify_worker_latch_is_set(worker_index);
+ }
+ }
+ CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
+ }
}
}
}