aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakub Kotur <qtr@google.com>2021-03-16 19:18:12 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-03-16 19:18:12 +0000
commit7c5f66a679b75a8325d0775df538492947a1bd0d (patch)
treef3b875c61b673490a50fc6a7e3a578c27e9c78c6
parentcc6ff67f9992c7f991228396a897677a0def9a78 (diff)
parent52de626812dfb44fcf1aad50fd4be8e8b905622e (diff)
downloadrayon-core-7c5f66a679b75a8325d0775df538492947a1bd0d.tar.gz
Initial import of rayon-core-1.9.0. am: 3b7f317415 am: 52de626812
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/rayon-core/+/1621088 Change-Id: Ieb0634b57000e13d81af6da0b9296d3080af173d
-rw-r--r--.cargo_vcs_info.json5
-rw-r--r--Cargo.toml75
-rw-r--r--Cargo.toml.orig58
-rw-r--r--README.md11
-rw-r--r--build.rs7
-rw-r--r--src/compile_fail/mod.rs7
-rw-r--r--src/compile_fail/quicksort_race1.rs28
-rw-r--r--src/compile_fail/quicksort_race2.rs28
-rw-r--r--src/compile_fail/quicksort_race3.rs28
-rw-r--r--src/compile_fail/rc_return.rs21
-rw-r--r--src/compile_fail/rc_upvar.rs11
-rw-r--r--src/compile_fail/scope_join_bad.rs24
-rw-r--r--src/job.rs217
-rw-r--r--src/join/mod.rs187
-rw-r--r--src/join/test.rs145
-rw-r--r--src/latch.rs339
-rw-r--r--src/lib.rs758
-rw-r--r--src/log.rs423
-rw-r--r--src/private.rs26
-rw-r--r--src/registry.rs925
-rw-r--r--src/scope/mod.rs630
-rw-r--r--src/scope/test.rs515
-rw-r--r--src/sleep/README.md219
-rw-r--r--src/sleep/counters.rs271
-rw-r--r--src/sleep/mod.rs392
-rw-r--r--src/spawn/mod.rs168
-rw-r--r--src/spawn/test.rs243
-rw-r--r--src/test.rs195
-rw-r--r--src/thread_pool/mod.rs315
-rw-r--r--src/thread_pool/test.rs338
-rw-r--r--src/unwind.rs31
-rw-r--r--src/util.rs10
-rw-r--r--tests/double_init_fail.rs14
-rw-r--r--tests/init_zero_threads.rs9
-rw-r--r--tests/scope_join.rs45
-rw-r--r--tests/scoped_threadpool.rs96
-rw-r--r--tests/simple_panic.rs7
-rw-r--r--tests/stack_overflow_crash.rs82
38 files changed, 6903 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..c28c857
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+ "git": {
+ "sha1": "dc13cb7875ad43c7d1ea8b1e504b09c031f7ed5a"
+ }
+}
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..1f825b6
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,75 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "rayon-core"
+version = "1.9.0"
+authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"]
+build = "build.rs"
+links = "rayon-core"
+description = "Core APIs for Rayon"
+documentation = "https://docs.rs/rayon/"
+readme = "README.md"
+keywords = ["parallel", "thread", "concurrency", "join", "performance"]
+categories = ["concurrency"]
+license = "Apache-2.0/MIT"
+repository = "https://github.com/rayon-rs/rayon"
+
+[[test]]
+name = "stack_overflow_crash"
+path = "tests/stack_overflow_crash.rs"
+harness = false
+
+[[test]]
+name = "double_init_fail"
+path = "tests/double_init_fail.rs"
+
+[[test]]
+name = "init_zero_threads"
+path = "tests/init_zero_threads.rs"
+
+[[test]]
+name = "scope_join"
+path = "tests/scope_join.rs"
+
+[[test]]
+name = "simple_panic"
+path = "tests/simple_panic.rs"
+
+[[test]]
+name = "scoped_threadpool"
+path = "tests/scoped_threadpool.rs"
+[dependencies.crossbeam-channel]
+version = "0.5.0"
+
+[dependencies.crossbeam-deque]
+version = "0.8.0"
+
+[dependencies.crossbeam-utils]
+version = "0.8.0"
+
+[dependencies.lazy_static]
+version = "1"
+
+[dependencies.num_cpus]
+version = "1.2"
+[dev-dependencies.rand]
+version = "0.7"
+
+[dev-dependencies.rand_xorshift]
+version = "0.2"
+
+[dev-dependencies.scoped-tls]
+version = "1.0"
+[target."cfg(unix)".dev-dependencies.libc]
+version = "0.2"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..b2082e4
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,58 @@
+[package]
+name = "rayon-core"
+version = "1.9.0" # reminder to update html_root_url attribute
+authors = ["Niko Matsakis <niko@alum.mit.edu>",
+ "Josh Stone <cuviper@gmail.com>"]
+description = "Core APIs for Rayon"
+license = "Apache-2.0/MIT"
+repository = "https://github.com/rayon-rs/rayon"
+documentation = "https://docs.rs/rayon/"
+edition = "2018"
+links = "rayon-core"
+build = "build.rs"
+readme = "README.md"
+keywords = ["parallel", "thread", "concurrency", "join", "performance"]
+categories = ["concurrency"]
+
+# Some dependencies may not be their latest version, in order to support older rustc.
+[dependencies]
+num_cpus = "1.2"
+lazy_static = "1"
+crossbeam-channel = "0.5.0"
+crossbeam-deque = "0.8.0"
+crossbeam-utils = "0.8.0"
+
+[dev-dependencies]
+rand = "0.7"
+rand_xorshift = "0.2"
+scoped-tls = "1.0"
+
+[target.'cfg(unix)'.dev-dependencies]
+libc = "0.2"
+
+[[test]]
+name = "stack_overflow_crash"
+path = "tests/stack_overflow_crash.rs"
+harness = false
+
+# NB: having one [[test]] manually defined means we need to declare them all
+
+[[test]]
+name = "double_init_fail"
+path = "tests/double_init_fail.rs"
+
+[[test]]
+name = "init_zero_threads"
+path = "tests/init_zero_threads.rs"
+
+[[test]]
+name = "scope_join"
+path = "tests/scope_join.rs"
+
+[[test]]
+name = "simple_panic"
+path = "tests/simple_panic.rs"
+
+[[test]]
+name = "scoped_threadpool"
+path = "tests/scoped_threadpool.rs"
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0c49362
--- /dev/null
+++ b/README.md
@@ -0,0 +1,11 @@
+Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool.
+
+Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join.
+
+rayon-core aims to never, or almost never, have a breaking change to its API, because each revision of rayon-core also houses the global thread-pool (and hence if you have two simultaneous versions of rayon-core, you have two thread-pools).
+
+Please see [Rayon Docs] for details about using Rayon.
+
+[Rayon Docs]: https://docs.rs/rayon/
+
+Rayon-core currently requires `rustc 1.36.0` or greater.
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..8771b63
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,7 @@
+// We need a build script to use `link = "rayon-core"`. But we're not
+// *actually* linking to anything, just making sure that we're the only
+// rayon-core in use.
+fn main() {
+ // we don't need to rebuild for anything else
+ println!("cargo:rerun-if-changed=build.rs");
+}
diff --git a/src/compile_fail/mod.rs b/src/compile_fail/mod.rs
new file mode 100644
index 0000000..f2ec646
--- /dev/null
+++ b/src/compile_fail/mod.rs
@@ -0,0 +1,7 @@
+// These modules contain `compile_fail` doc tests.
+mod quicksort_race1;
+mod quicksort_race2;
+mod quicksort_race3;
+mod rc_return;
+mod rc_upvar;
+mod scope_join_bad;
diff --git a/src/compile_fail/quicksort_race1.rs b/src/compile_fail/quicksort_race1.rs
new file mode 100644
index 0000000..5615033
--- /dev/null
+++ b/src/compile_fail/quicksort_race1.rs
@@ -0,0 +1,28 @@
+/*! ```compile_fail,E0524
+
+fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
+ if v.len() <= 1 {
+ return;
+ }
+
+ let mid = partition(v);
+ let (lo, _hi) = v.split_at_mut(mid);
+ rayon_core::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR
+}
+
+fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
+ let pivot = v.len() - 1;
+ let mut i = 0;
+ for j in 0..pivot {
+ if v[j] <= v[pivot] {
+ v.swap(i, j);
+ i += 1;
+ }
+ }
+ v.swap(i, pivot);
+ i
+}
+
+fn main() { }
+
+``` */
diff --git a/src/compile_fail/quicksort_race2.rs b/src/compile_fail/quicksort_race2.rs
new file mode 100644
index 0000000..020589c
--- /dev/null
+++ b/src/compile_fail/quicksort_race2.rs
@@ -0,0 +1,28 @@
+/*! ```compile_fail,E0500
+
+fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
+ if v.len() <= 1 {
+ return;
+ }
+
+ let mid = partition(v);
+ let (lo, _hi) = v.split_at_mut(mid);
+ rayon_core::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR
+}
+
+fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
+ let pivot = v.len() - 1;
+ let mut i = 0;
+ for j in 0..pivot {
+ if v[j] <= v[pivot] {
+ v.swap(i, j);
+ i += 1;
+ }
+ }
+ v.swap(i, pivot);
+ i
+}
+
+fn main() { }
+
+``` */
diff --git a/src/compile_fail/quicksort_race3.rs b/src/compile_fail/quicksort_race3.rs
new file mode 100644
index 0000000..16fbf3b
--- /dev/null
+++ b/src/compile_fail/quicksort_race3.rs
@@ -0,0 +1,28 @@
+/*! ```compile_fail,E0524
+
+fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
+ if v.len() <= 1 {
+ return;
+ }
+
+ let mid = partition(v);
+ let (_lo, hi) = v.split_at_mut(mid);
+ rayon_core::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR
+}
+
+fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
+ let pivot = v.len() - 1;
+ let mut i = 0;
+ for j in 0..pivot {
+ if v[j] <= v[pivot] {
+ v.swap(i, j);
+ i += 1;
+ }
+ }
+ v.swap(i, pivot);
+ i
+}
+
+fn main() { }
+
+``` */
diff --git a/src/compile_fail/rc_return.rs b/src/compile_fail/rc_return.rs
new file mode 100644
index 0000000..164f8ce
--- /dev/null
+++ b/src/compile_fail/rc_return.rs
@@ -0,0 +1,21 @@
+/** ```compile_fail,E0277
+
+use std::rc::Rc;
+
+fn main() {
+ rayon_core::join(|| Rc::new(22), || ()); //~ ERROR
+}
+
+``` */
+mod left {}
+
+/** ```compile_fail,E0277
+
+use std::rc::Rc;
+
+fn main() {
+ rayon_core::join(|| (), || Rc::new(23)); //~ ERROR
+}
+
+``` */
+mod right {}
diff --git a/src/compile_fail/rc_upvar.rs b/src/compile_fail/rc_upvar.rs
new file mode 100644
index 0000000..62895bf
--- /dev/null
+++ b/src/compile_fail/rc_upvar.rs
@@ -0,0 +1,11 @@
+/*! ```compile_fail,E0277
+
+use std::rc::Rc;
+
+fn main() {
+ let r = Rc::new(22);
+ rayon_core::join(|| r.clone(), || r.clone());
+ //~^ ERROR
+}
+
+``` */
diff --git a/src/compile_fail/scope_join_bad.rs b/src/compile_fail/scope_join_bad.rs
new file mode 100644
index 0000000..75e4c5c
--- /dev/null
+++ b/src/compile_fail/scope_join_bad.rs
@@ -0,0 +1,24 @@
+/*! ```compile_fail,E0373
+
+fn bad_scope<F>(f: F)
+ where F: FnOnce(&i32) + Send,
+{
+ rayon_core::scope(|s| {
+ let x = 22;
+ s.spawn(|_| f(&x)); //~ ERROR `x` does not live long enough
+ });
+}
+
+fn good_scope<F>(f: F)
+ where F: FnOnce(&i32) + Send,
+{
+ let x = 22;
+ rayon_core::scope(|s| {
+ s.spawn(|_| f(&x));
+ });
+}
+
+fn main() {
+}
+
+``` */
diff --git a/src/job.rs b/src/job.rs
new file mode 100644
index 0000000..a71f1b0
--- /dev/null
+++ b/src/job.rs
@@ -0,0 +1,217 @@
+use crate::latch::Latch;
+use crate::unwind;
+use crossbeam_deque::{Injector, Steal};
+use std::any::Any;
+use std::cell::UnsafeCell;
+use std::mem;
+
+pub(super) enum JobResult<T> {
+ None,
+ Ok(T),
+ Panic(Box<dyn Any + Send>),
+}
+
+/// A `Job` is used to advertise work for other threads that they may
+/// want to steal. In accordance with time honored tradition, jobs are
+/// arranged in a deque, so that thieves can take from the top of the
+/// deque while the main worker manages the bottom of the deque. This
+/// deque is managed by the `thread_pool` module.
+pub(super) trait Job {
+ /// Unsafe: this may be called from a different thread than the one
+ /// which scheduled the job, so the implementer must ensure the
+ /// appropriate traits are met, whether `Send`, `Sync`, or both.
+ unsafe fn execute(this: *const Self);
+}
+
+/// Effectively a Job trait object. Each JobRef **must** be executed
+/// exactly once, or else data may leak.
+///
+/// Internally, we store the job's data in a `*const ()` pointer. The
+/// true type is something like `*const StackJob<...>`, but we hide
+/// it. We also carry the "execute fn" from the `Job` trait.
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub(super) struct JobRef {
+ pointer: *const (),
+ execute_fn: unsafe fn(*const ()),
+}
+
+unsafe impl Send for JobRef {}
+unsafe impl Sync for JobRef {}
+
+impl JobRef {
+ /// Unsafe: caller asserts that `data` will remain valid until the
+ /// job is executed.
+ pub(super) unsafe fn new<T>(data: *const T) -> JobRef
+ where
+ T: Job,
+ {
+ let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
+
+ // erase types:
+ JobRef {
+ pointer: data as *const (),
+ execute_fn: mem::transmute(fn_ptr),
+ }
+ }
+
+ #[inline]
+ pub(super) unsafe fn execute(&self) {
+ (self.execute_fn)(self.pointer)
+ }
+}
+
+/// A job that will be owned by a stack slot. This means that when it
+/// executes it need not free any heap data, the cleanup occurs when
+/// the stack frame is later popped. The function parameter indicates
+/// `true` if the job was stolen -- executed on a different thread.
+pub(super) struct StackJob<L, F, R>
+where
+ L: Latch + Sync,
+ F: FnOnce(bool) -> R + Send,
+ R: Send,
+{
+ pub(super) latch: L,
+ func: UnsafeCell<Option<F>>,
+ result: UnsafeCell<JobResult<R>>,
+}
+
+impl<L, F, R> StackJob<L, F, R>
+where
+ L: Latch + Sync,
+ F: FnOnce(bool) -> R + Send,
+ R: Send,
+{
+ pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
+ StackJob {
+ latch,
+ func: UnsafeCell::new(Some(func)),
+ result: UnsafeCell::new(JobResult::None),
+ }
+ }
+
+ pub(super) unsafe fn as_job_ref(&self) -> JobRef {
+ JobRef::new(self)
+ }
+
+ pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
+ self.func.into_inner().unwrap()(stolen)
+ }
+
+ pub(super) unsafe fn into_result(self) -> R {
+ self.result.into_inner().into_return_value()
+ }
+}
+
+impl<L, F, R> Job for StackJob<L, F, R>
+where
+ L: Latch + Sync,
+ F: FnOnce(bool) -> R + Send,
+ R: Send,
+{
+ unsafe fn execute(this: *const Self) {
+ fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
+ move || func(true)
+ }
+
+ let this = &*this;
+ let abort = unwind::AbortIfPanic;
+ let func = (*this.func.get()).take().unwrap();
+ (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
+ Ok(x) => JobResult::Ok(x),
+ Err(x) => JobResult::Panic(x),
+ };
+ this.latch.set();
+ mem::forget(abort);
+ }
+}
+
+/// Represents a job stored in the heap. Used to implement
+/// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
+/// invokes a closure, which then triggers the appropriate logic to
+/// signal that the job executed.
+///
+/// (Probably `StackJob` should be refactored in a similar fashion.)
+pub(super) struct HeapJob<BODY>
+where
+ BODY: FnOnce() + Send,
+{
+ job: UnsafeCell<Option<BODY>>,
+}
+
+impl<BODY> HeapJob<BODY>
+where
+ BODY: FnOnce() + Send,
+{
+ pub(super) fn new(func: BODY) -> Self {
+ HeapJob {
+ job: UnsafeCell::new(Some(func)),
+ }
+ }
+
+ /// Creates a `JobRef` from this job -- note that this hides all
+ /// lifetimes, so it is up to you to ensure that this JobRef
+ /// doesn't outlive any data that it closes over.
+ pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
+ let this: *const Self = mem::transmute(self);
+ JobRef::new(this)
+ }
+}
+
+impl<BODY> Job for HeapJob<BODY>
+where
+ BODY: FnOnce() + Send,
+{
+ unsafe fn execute(this: *const Self) {
+ let this: Box<Self> = mem::transmute(this);
+ let job = (*this.job.get()).take().unwrap();
+ job();
+ }
+}
+
+impl<T> JobResult<T> {
+ /// Convert the `JobResult` for a job that has finished (and hence
+ /// its JobResult is populated) into its return value.
+ ///
+ /// NB. This will panic if the job panicked.
+ pub(super) fn into_return_value(self) -> T {
+ match self {
+ JobResult::None => unreachable!(),
+ JobResult::Ok(x) => x,
+ JobResult::Panic(x) => unwind::resume_unwinding(x),
+ }
+ }
+}
+
+/// Indirect queue to provide FIFO job priority.
+pub(super) struct JobFifo {
+ inner: Injector<JobRef>,
+}
+
+impl JobFifo {
+ pub(super) fn new() -> Self {
+ JobFifo {
+ inner: Injector::new(),
+ }
+ }
+
+ pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
+ // A little indirection ensures that spawns are always prioritized in FIFO order. The
+ // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
+ // (FIFO), but either way they will end up popping from the front of this queue.
+ self.inner.push(job_ref);
+ JobRef::new(self)
+ }
+}
+
+impl Job for JobFifo {
+ unsafe fn execute(this: *const Self) {
+ // We "execute" a queue by executing its first job, FIFO.
+ loop {
+ match (*this).inner.steal() {
+ Steal::Success(job_ref) => break job_ref.execute(),
+ Steal::Empty => panic!("FIFO is empty"),
+ Steal::Retry => {}
+ }
+ }
+ }
+}
diff --git a/src/join/mod.rs b/src/join/mod.rs
new file mode 100644
index 0000000..d72c7e6
--- /dev/null
+++ b/src/join/mod.rs
@@ -0,0 +1,187 @@
+use crate::job::StackJob;
+use crate::latch::SpinLatch;
+use crate::registry::{self, WorkerThread};
+use crate::unwind;
+use std::any::Any;
+
+use crate::FnContext;
+
+#[cfg(test)]
+mod test;
+
+/// Takes two closures and *potentially* runs them in parallel. It
+/// returns a pair of the results from those closures.
+///
+/// Conceptually, calling `join()` is similar to spawning two threads,
+/// one executing each of the two closures. However, the
+/// implementation is quite different and incurs very low
+/// overhead. The underlying technique is called "work stealing": the
+/// Rayon runtime uses a fixed pool of worker threads and attempts to
+/// only execute code in parallel when there are idle CPUs to handle
+/// it.
+///
+/// When `join` is called from outside the thread pool, the calling
+/// thread will block while the closures execute in the pool. When
+/// `join` is called within the pool, the calling thread still actively
+/// participates in the thread pool. It will begin by executing closure
+/// A (on the current thread). While it is doing that, it will advertise
+/// closure B as being available for other threads to execute. Once closure A
+/// has completed, the current thread will try to execute closure B;
+/// if however closure B has been stolen, then it will look for other work
+/// while waiting for the thief to fully execute closure B. (This is the
+/// typical work-stealing strategy).
+///
+/// # Examples
+///
+/// This example uses join to perform a quick-sort (note this is not a
+/// particularly optimized implementation: if you **actually** want to
+/// sort for real, you should prefer [the `par_sort` method] offered
+/// by Rayon).
+///
+/// [the `par_sort` method]: ../rayon/slice/trait.ParallelSliceMut.html#method.par_sort
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let mut v = vec![5, 1, 8, 22, 0, 44];
+/// quick_sort(&mut v);
+/// assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
+///
+/// fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
+/// if v.len() > 1 {
+/// let mid = partition(v);
+/// let (lo, hi) = v.split_at_mut(mid);
+/// rayon::join(|| quick_sort(lo),
+/// || quick_sort(hi));
+/// }
+/// }
+///
+/// // Partition rearranges all items `<=` to the pivot
+/// // item (arbitrary selected to be the last item in the slice)
+/// // to the first half of the slice. It then returns the
+/// // "dividing point" where the pivot is placed.
+/// fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
+/// let pivot = v.len() - 1;
+/// let mut i = 0;
+/// for j in 0..pivot {
+/// if v[j] <= v[pivot] {
+/// v.swap(i, j);
+/// i += 1;
+/// }
+/// }
+/// v.swap(i, pivot);
+/// i
+/// }
+/// ```
+///
+/// # Warning about blocking I/O
+///
+/// The assumption is that the closures given to `join()` are
+/// CPU-bound tasks that do not perform I/O or other blocking
+/// operations. If you do perform I/O, and that I/O should block
+/// (e.g., waiting for a network request), the overall performance may
+/// be poor. Moreover, if you cause one closure to be blocked waiting
+/// on another (for example, using a channel), that could lead to a
+/// deadlock.
+///
+/// # Panics
+///
+/// No matter what happens, both closures will always be executed. If
+/// a single closure panics, whether it be the first or second
+/// closure, that panic will be propagated and hence `join()` will
+/// panic with the same panic value. If both closures panic, `join()`
+/// will panic with the panic value from the first closure.
+pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
+where
+ A: FnOnce() -> RA + Send,
+ B: FnOnce() -> RB + Send,
+ RA: Send,
+ RB: Send,
+{
+ #[inline]
+ fn call<R>(f: impl FnOnce() -> R) -> impl FnOnce(FnContext) -> R {
+ move |_| f()
+ }
+
+ join_context(call(oper_a), call(oper_b))
+}
+
+/// Identical to `join`, except that the closures have a parameter
+/// that provides context for the way the closure has been called,
+/// especially indicating whether they're executing on a different
+/// thread than where `join_context` was called. This will occur if
+/// the second job is stolen by a different thread, or if
+/// `join_context` was called from outside the thread pool to begin
+/// with.
+pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
+where
+ A: FnOnce(FnContext) -> RA + Send,
+ B: FnOnce(FnContext) -> RB + Send,
+ RA: Send,
+ RB: Send,
+{
+ #[inline]
+ fn call_a<R>(f: impl FnOnce(FnContext) -> R, injected: bool) -> impl FnOnce() -> R {
+ move || f(FnContext::new(injected))
+ }
+
+ #[inline]
+ fn call_b<R>(f: impl FnOnce(FnContext) -> R) -> impl FnOnce(bool) -> R {
+ move |migrated| f(FnContext::new(migrated))
+ }
+
+ registry::in_worker(|worker_thread, injected| unsafe {
+ // Create virtual wrapper for task b; this all has to be
+ // done here so that the stack frame can keep it all live
+ // long enough.
+ let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
+ let job_b_ref = job_b.as_job_ref();
+ worker_thread.push(job_b_ref);
+
+ // Execute task a; hopefully b gets stolen in the meantime.
+ let status_a = unwind::halt_unwinding(call_a(oper_a, injected));
+ let result_a = match status_a {
+ Ok(v) => v,
+ Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
+ };
+
+ // Now that task A has finished, try to pop job B from the
+ // local stack. It may already have been popped by job A; it
+ // may also have been stolen. There may also be some tasks
+ // pushed on top of it in the stack, and we will have to pop
+ // those off to get to it.
+ while !job_b.latch.probe() {
+ if let Some(job) = worker_thread.take_local_job() {
+ if job == job_b_ref {
+ // Found it! Let's run it.
+ //
+ // Note that this could panic, but it's ok if we unwind here.
+ let result_b = job_b.run_inline(injected);
+ return (result_a, result_b);
+ } else {
+ worker_thread.execute(job);
+ }
+ } else {
+ // Local deque is empty. Time to steal from other
+ // threads.
+ worker_thread.wait_until(&job_b.latch);
+ debug_assert!(job_b.latch.probe());
+ break;
+ }
+ }
+
+ (result_a, job_b.into_result())
+ })
+}
+
+/// If job A panics, we still cannot return until we are sure that job
+/// B is complete. This is because it may contain references into the
+/// enclosing stack frame(s).
+#[cold] // cold path
+unsafe fn join_recover_from_panic(
+ worker_thread: &WorkerThread,
+ job_b_latch: &SpinLatch<'_>,
+ err: Box<dyn Any + Send>,
+) -> ! {
+ worker_thread.wait_until(job_b_latch);
+ unwind::resume_unwinding(err)
+}
diff --git a/src/join/test.rs b/src/join/test.rs
new file mode 100644
index 0000000..e7f287f
--- /dev/null
+++ b/src/join/test.rs
@@ -0,0 +1,145 @@
+//! Tests for the join code.
+
+use crate::join::*;
+use crate::unwind;
+use crate::ThreadPoolBuilder;
+use rand::distributions::Standard;
+use rand::{Rng, SeedableRng};
+use rand_xorshift::XorShiftRng;
+
+fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) {
+ if v.len() <= 1 {
+ return;
+ }
+
+ let mid = partition(v);
+ let (lo, hi) = v.split_at_mut(mid);
+ join(|| quick_sort(lo), || quick_sort(hi));
+}
+
+fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize {
+ let pivot = v.len() - 1;
+ let mut i = 0;
+ for j in 0..pivot {
+ if v[j] <= v[pivot] {
+ v.swap(i, j);
+ i += 1;
+ }
+ }
+ v.swap(i, pivot);
+ i
+}
+
+fn seeded_rng() -> XorShiftRng {
+ let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
+ (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
+ XorShiftRng::from_seed(seed)
+}
+
+#[test]
+fn sort() {
+ let rng = seeded_rng();
+ let mut data: Vec<u32> = rng.sample_iter(&Standard).take(6 * 1024).collect();
+ let mut sorted_data = data.clone();
+ sorted_data.sort();
+ quick_sort(&mut data);
+ assert_eq!(data, sorted_data);
+}
+
+#[test]
+fn sort_in_pool() {
+ let rng = seeded_rng();
+ let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
+
+ let pool = ThreadPoolBuilder::new().build().unwrap();
+ let mut sorted_data = data.clone();
+ sorted_data.sort();
+ pool.install(|| quick_sort(&mut data));
+ assert_eq!(data, sorted_data);
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_a() {
+ join(|| panic!("Hello, world!"), || ());
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_b() {
+ join(|| (), || panic!("Hello, world!"));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_both() {
+ join(|| panic!("Hello, world!"), || panic!("Goodbye, world!"));
+}
+
+#[test]
+fn panic_b_still_executes() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
+ Ok(_) => panic!("failed to propagate panic from closure A,"),
+ Err(_) => assert!(x, "closure b failed to execute"),
+ }
+}
+
+#[test]
+fn join_context_both() {
+ // If we're not in a pool, both should be marked stolen as they're injected.
+ let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
+ assert!(a_migrated);
+ assert!(b_migrated);
+}
+
+#[test]
+fn join_context_neither() {
+ // If we're already in a 1-thread pool, neither job should be stolen.
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (a_migrated, b_migrated) =
+ pool.install(|| join_context(|a| a.migrated(), |b| b.migrated()));
+ assert!(!a_migrated);
+ assert!(!b_migrated);
+}
+
+#[test]
+fn join_context_second() {
+ use std::sync::Barrier;
+
+ // If we're already in a 2-thread pool, the second job should be stolen.
+ let barrier = Barrier::new(2);
+ let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
+ let (a_migrated, b_migrated) = pool.install(|| {
+ join_context(
+ |a| {
+ barrier.wait();
+ a.migrated()
+ },
+ |b| {
+ barrier.wait();
+ b.migrated()
+ },
+ )
+ });
+ assert!(!a_migrated);
+ assert!(b_migrated);
+}
+
+#[test]
+fn join_counter_overflow() {
+ const MAX: u32 = 500_000;
+
+ let mut i = 0;
+ let mut j = 0;
+ let pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
+
+ // Hammer on join a bunch of times -- used to hit overflow debug-assertions
+ // in JEC on 32-bit targets: https://github.com/rayon-rs/rayon/issues/797
+ for _ in 0..MAX {
+ pool.join(|| i += 1, || j += 1);
+ }
+
+ assert_eq!(i, MAX);
+ assert_eq!(j, MAX);
+}
diff --git a/src/latch.rs b/src/latch.rs
new file mode 100644
index 0000000..0965bb9
--- /dev/null
+++ b/src/latch.rs
@@ -0,0 +1,339 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Condvar, Mutex};
+use std::usize;
+
+use crate::registry::{Registry, WorkerThread};
+
+/// We define various kinds of latches, which are all a primitive signaling
+/// mechanism. A latch starts as false. Eventually someone calls `set()` and
+/// it becomes true. You can test if it has been set by calling `probe()`.
+///
+/// Some kinds of latches, but not all, support a `wait()` operation
+/// that will wait until the latch is set, blocking efficiently. That
+/// is not part of the trait since it is not possibly to do with all
+/// latches.
+///
+/// The intention is that `set()` is called once, but `probe()` may be
+/// called any number of times. Once `probe()` returns true, the memory
+/// effects that occurred before `set()` become visible.
+///
+/// It'd probably be better to refactor the API into two paired types,
+/// but that's a bit of work, and this is not a public API.
+///
+/// ## Memory ordering
+///
+/// Latches need to guarantee two things:
+///
+/// - Once `probe()` returns true, all memory effects from the `set()`
+/// are visible (in other words, the set should synchronize-with
+/// the probe).
+/// - Once `set()` occurs, the next `probe()` *will* observe it. This
+/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
+/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
+pub(super) trait Latch {
+ /// Set the latch, signalling others.
+ ///
+ /// # WARNING
+ ///
+ /// Setting a latch triggers other threads to wake up and (in some
+ /// cases) complete. This may, in turn, cause memory to be
+ /// allocated and so forth. One must be very careful about this,
+ /// and it's typically better to read all the fields you will need
+ /// to access *before* a latch is set!
+ fn set(&self);
+}
+
+pub(super) trait AsCoreLatch {
+ fn as_core_latch(&self) -> &CoreLatch;
+}
+
+/// Latch is not set, owning thread is awake
+const UNSET: usize = 0;
+
+/// Latch is not set, owning thread is going to sleep on this latch
+/// (but has not yet fallen asleep).
+const SLEEPY: usize = 1;
+
+/// Latch is not set, owning thread is asleep on this latch and
+/// must be awoken.
+const SLEEPING: usize = 2;
+
+/// Latch is set.
+const SET: usize = 3;
+
+/// Spin latches are the simplest, most efficient kind, but they do
+/// not support a `wait()` operation. They just have a boolean flag
+/// that becomes true when `set()` is called.
+#[derive(Debug)]
+pub(super) struct CoreLatch {
+ state: AtomicUsize,
+}
+
+impl CoreLatch {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ state: AtomicUsize::new(0),
+ }
+ }
+
+ /// Returns the address of this core latch as an integer. Used
+ /// for logging.
+ #[inline]
+ pub(super) fn addr(&self) -> usize {
+ self as *const CoreLatch as usize
+ }
+
+ /// Invoked by owning thread as it prepares to sleep. Returns true
+ /// if the owning thread may proceed to fall asleep, false if the
+ /// latch was set in the meantime.
+ #[inline]
+ pub(super) fn get_sleepy(&self) -> bool {
+ self.state
+ .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Invoked by owning thread as it falls asleep sleep. Returns
+ /// true if the owning thread should block, or false if the latch
+ /// was set in the meantime.
+ #[inline]
+ pub(super) fn fall_asleep(&self) -> bool {
+ self.state
+ .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Invoked by owning thread as it falls asleep sleep. Returns
+ /// true if the owning thread should block, or false if the latch
+ /// was set in the meantime.
+ #[inline]
+ pub(super) fn wake_up(&self) {
+ if !self.probe() {
+ let _ =
+ self.state
+ .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
+ }
+ }
+
+ /// Set the latch. If this returns true, the owning thread was sleeping
+ /// and must be awoken.
+ ///
+ /// This is private because, typically, setting a latch involves
+ /// doing some wakeups; those are encapsulated in the surrounding
+ /// latch code.
+ #[inline]
+ fn set(&self) -> bool {
+ let old_state = self.state.swap(SET, Ordering::AcqRel);
+ old_state == SLEEPING
+ }
+
+ /// Test if this latch has been set.
+ #[inline]
+ pub(super) fn probe(&self) -> bool {
+ self.state.load(Ordering::Acquire) == SET
+ }
+}
+
+/// Spin latches are the simplest, most efficient kind, but they do
+/// not support a `wait()` operation. They just have a boolean flag
+/// that becomes true when `set()` is called.
+pub(super) struct SpinLatch<'r> {
+ core_latch: CoreLatch,
+ registry: &'r Arc<Registry>,
+ target_worker_index: usize,
+ cross: bool,
+}
+
+impl<'r> SpinLatch<'r> {
+ /// Creates a new spin latch that is owned by `thread`. This means
+ /// that `thread` is the only thread that should be blocking on
+ /// this latch -- it also means that when the latch is set, we
+ /// will wake `thread` if it is sleeping.
+ #[inline]
+ pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
+ SpinLatch {
+ core_latch: CoreLatch::new(),
+ registry: thread.registry(),
+ target_worker_index: thread.index(),
+ cross: false,
+ }
+ }
+
+ /// Creates a new spin latch for cross-threadpool blocking. Notably, we
+ /// need to make sure the registry is kept alive after setting, so we can
+ /// safely call the notification.
+ #[inline]
+ pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
+ SpinLatch {
+ cross: true,
+ ..SpinLatch::new(thread)
+ }
+ }
+
+ #[inline]
+ pub(super) fn probe(&self) -> bool {
+ self.core_latch.probe()
+ }
+}
+
+impl<'r> AsCoreLatch for SpinLatch<'r> {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ &self.core_latch
+ }
+}
+
+impl<'r> Latch for SpinLatch<'r> {
+ #[inline]
+ fn set(&self) {
+ let cross_registry;
+
+ let registry = if self.cross {
+ // Ensure the registry stays alive while we notify it.
+ // Otherwise, it would be possible that we set the spin
+ // latch and the other thread sees it and exits, causing
+ // the registry to be deallocated, all before we get a
+ // chance to invoke `registry.notify_worker_latch_is_set`.
+ cross_registry = Arc::clone(self.registry);
+ &cross_registry
+ } else {
+ // If this is not a "cross-registry" spin-latch, then the
+ // thread which is performing `set` is itself ensuring
+ // that the registry stays alive.
+ self.registry
+ };
+ let target_worker_index = self.target_worker_index;
+
+ // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
+ if self.core_latch.set() {
+ // Subtle: at this point, we can no longer read from
+ // `self`, because the thread owning this spin latch may
+ // have awoken and deallocated the latch. Therefore, we
+ // only use fields whose values we already read.
+ registry.notify_worker_latch_is_set(target_worker_index);
+ }
+ }
+}
+
+/// A Latch starts as false and eventually becomes true. You can block
+/// until it becomes true.
+pub(super) struct LockLatch {
+ m: Mutex<bool>,
+ v: Condvar,
+}
+
+impl LockLatch {
+ #[inline]
+ pub(super) fn new() -> LockLatch {
+ LockLatch {
+ m: Mutex::new(false),
+ v: Condvar::new(),
+ }
+ }
+
+ /// Block until latch is set, then resets this lock latch so it can be reused again.
+ pub(super) fn wait_and_reset(&self) {
+ let mut guard = self.m.lock().unwrap();
+ while !*guard {
+ guard = self.v.wait(guard).unwrap();
+ }
+ *guard = false;
+ }
+
+ /// Block until latch is set.
+ pub(super) fn wait(&self) {
+ let mut guard = self.m.lock().unwrap();
+ while !*guard {
+ guard = self.v.wait(guard).unwrap();
+ }
+ }
+}
+
+impl Latch for LockLatch {
+ #[inline]
+ fn set(&self) {
+ let mut guard = self.m.lock().unwrap();
+ *guard = true;
+ self.v.notify_all();
+ }
+}
+
+/// Counting latches are used to implement scopes. They track a
+/// counter. Unlike other latches, calling `set()` does not
+/// necessarily make the latch be considered `set()`; instead, it just
+/// decrements the counter. The latch is only "set" (in the sense that
+/// `probe()` returns true) once the counter reaches zero.
+///
+/// Note: like a `SpinLatch`, count laches are always associated with
+/// some registry that is probing them, which must be tickled when
+/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
+/// reference to that registry. This is because in some cases the
+/// registry owns the count-latch, and that would create a cycle. So a
+/// `CountLatch` must be given a reference to its owning registry when
+/// it is set. For this reason, it does not implement the `Latch`
+/// trait (but it doesn't have to, as it is not used in those generic
+/// contexts).
+#[derive(Debug)]
+pub(super) struct CountLatch {
+ core_latch: CoreLatch,
+ counter: AtomicUsize,
+}
+
+impl CountLatch {
+ #[inline]
+ pub(super) fn new() -> CountLatch {
+ CountLatch {
+ core_latch: CoreLatch::new(),
+ counter: AtomicUsize::new(1),
+ }
+ }
+
+ #[inline]
+ pub(super) fn increment(&self) {
+ debug_assert!(!self.core_latch.probe());
+ self.counter.fetch_add(1, Ordering::Relaxed);
+ }
+
+ /// Decrements the latch counter by one. If this is the final
+ /// count, then the latch is **set**, and calls to `probe()` will
+ /// return true. Returns whether the latch was set. This is an
+ /// internal operation, as it does not tickle, and to fail to
+ /// tickle would lead to deadlock.
+ #[inline]
+ fn set(&self) -> bool {
+ if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ self.core_latch.set();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Decrements the latch counter by one and possibly set it. If
+ /// the latch is set, then the specific worker thread is tickled,
+ /// which should be the one that owns this latch.
+ #[inline]
+ pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
+ if self.set() {
+ registry.notify_worker_latch_is_set(target_worker_index);
+ }
+ }
+}
+
+impl AsCoreLatch for CountLatch {
+ #[inline]
+ fn as_core_latch(&self) -> &CoreLatch {
+ &self.core_latch
+ }
+}
+
+impl<'a, L> Latch for &'a L
+where
+ L: Latch,
+{
+ #[inline]
+ fn set(&self) {
+ L::set(self);
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..f514bb6
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,758 @@
+//!
+//! [Under construction](https://github.com/rayon-rs/rayon/issues/231)
+//!
+//! ## Restricting multiple versions
+//!
+//! In order to ensure proper coordination between threadpools, and especially
+//! to make sure there's only one global threadpool, `rayon-core` is actively
+//! restricted from building multiple versions of itself into a single target.
+//! You may see a build error like this in violation:
+//!
+//! ```text
+//! error: native library `rayon-core` is being linked to by more
+//! than one package, and can only be linked to by one package
+//! ```
+//!
+//! While we strive to keep `rayon-core` semver-compatible, it's still
+//! possible to arrive at this situation if different crates have overly
+//! restrictive tilde or inequality requirements for `rayon-core`. The
+//! conflicting requirements will need to be resolved before the build will
+//! succeed.
+
+#![doc(html_root_url = "https://docs.rs/rayon-core/1.9")]
+#![deny(missing_debug_implementations)]
+#![deny(missing_docs)]
+#![deny(unreachable_pub)]
+#![warn(rust_2018_idioms)]
+
+use std::any::Any;
+use std::env;
+use std::error::Error;
+use std::fmt;
+use std::io;
+use std::marker::PhantomData;
+use std::str::FromStr;
+
+#[macro_use]
+mod log;
+#[macro_use]
+mod private;
+
+mod job;
+mod join;
+mod latch;
+mod registry;
+mod scope;
+mod sleep;
+mod spawn;
+mod thread_pool;
+mod unwind;
+mod util;
+
+mod compile_fail;
+mod test;
+
+pub use self::join::{join, join_context};
+pub use self::registry::ThreadBuilder;
+pub use self::scope::{scope, Scope};
+pub use self::scope::{scope_fifo, ScopeFifo};
+pub use self::spawn::{spawn, spawn_fifo};
+pub use self::thread_pool::current_thread_has_pending_tasks;
+pub use self::thread_pool::current_thread_index;
+pub use self::thread_pool::ThreadPool;
+
+use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
+
+/// Returns the number of threads in the current registry. If this
+/// code is executing within a Rayon thread-pool, then this will be
+/// the number of threads for the thread-pool of the current
+/// thread. Otherwise, it will be the number of threads for the global
+/// thread-pool.
+///
+/// This can be useful when trying to judge how many times to split
+/// parallel work (the parallel iterator traits use this value
+/// internally for this purpose).
+///
+/// # Future compatibility note
+///
+/// Note that unless this thread-pool was created with a
+/// builder that specifies the number of threads, then this
+/// number may vary over time in future versions (see [the
+/// `num_threads()` method for details][snt]).
+///
+/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+pub fn current_num_threads() -> usize {
+ crate::registry::Registry::current_num_threads()
+}
+
+/// Error when initializing a thread pool.
+#[derive(Debug)]
+pub struct ThreadPoolBuildError {
+ kind: ErrorKind,
+}
+
+#[derive(Debug)]
+enum ErrorKind {
+ GlobalPoolAlreadyInitialized,
+ IOError(io::Error),
+}
+
+/// Used to create a new [`ThreadPool`] or to configure the global rayon thread pool.
+/// ## Creating a ThreadPool
+/// The following creates a thread pool with 22 threads.
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let pool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+/// ```
+///
+/// To instead configure the global thread pool, use [`build_global()`]:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
+/// ```
+///
+/// [`ThreadPool`]: struct.ThreadPool.html
+/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
+pub struct ThreadPoolBuilder<S = DefaultSpawn> {
+ /// The number of threads in the rayon thread pool.
+ /// If zero will use the RAYON_NUM_THREADS environment variable.
+ /// If RAYON_NUM_THREADS is invalid or zero will use the default.
+ num_threads: usize,
+
+ /// Custom closure, if any, to handle a panic that we cannot propagate
+ /// anywhere else.
+ panic_handler: Option<Box<PanicHandler>>,
+
+ /// Closure to compute the name of a thread.
+ get_thread_name: Option<Box<dyn FnMut(usize) -> String>>,
+
+ /// The stack size for the created worker threads
+ stack_size: Option<usize>,
+
+ /// Closure invoked on worker thread start.
+ start_handler: Option<Box<StartHandler>>,
+
+ /// Closure invoked on worker thread exit.
+ exit_handler: Option<Box<ExitHandler>>,
+
+ /// Closure invoked to spawn threads.
+ spawn_handler: S,
+
+ /// If false, worker threads will execute spawned jobs in a
+ /// "depth-first" fashion. If true, they will do a "breadth-first"
+ /// fashion. Depth-first is the default.
+ breadth_first: bool,
+}
+
+/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
+///
+/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
+#[deprecated(note = "Use `ThreadPoolBuilder`")]
+pub struct Configuration {
+ builder: ThreadPoolBuilder,
+}
+
+/// The type for a panic handling closure. Note that this same closure
+/// may be invoked multiple times in parallel.
+type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
+
+/// The type for a closure that gets invoked when a thread starts. The
+/// closure is passed the index of the thread on which it is invoked.
+/// Note that this same closure may be invoked multiple times in parallel.
+type StartHandler = dyn Fn(usize) + Send + Sync;
+
+/// The type for a closure that gets invoked when a thread exits. The
+/// closure is passed the index of the thread on which is is invoked.
+/// Note that this same closure may be invoked multiple times in parallel.
+type ExitHandler = dyn Fn(usize) + Send + Sync;
+
+// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
+impl Default for ThreadPoolBuilder {
+ fn default() -> Self {
+ ThreadPoolBuilder {
+ num_threads: 0,
+ panic_handler: None,
+ get_thread_name: None,
+ stack_size: None,
+ start_handler: None,
+ exit_handler: None,
+ spawn_handler: DefaultSpawn,
+ breadth_first: false,
+ }
+ }
+}
+
+impl ThreadPoolBuilder {
+ /// Creates and returns a valid rayon thread pool builder, but does not initialize it.
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
+/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
+impl<S> ThreadPoolBuilder<S>
+where
+ S: ThreadSpawn,
+{
+ /// Creates a new `ThreadPool` initialized using this configuration.
+ pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
+ ThreadPool::build(self)
+ }
+
+ /// Initializes the global thread pool. This initialization is
+ /// **optional**. If you do not call this function, the thread pool
+ /// will be automatically initialized with the default
+ /// configuration. Calling `build_global` is not recommended, except
+ /// in two scenarios:
+ ///
+ /// - You wish to change the default configuration.
+ /// - You are running a benchmark, in which case initializing may
+ /// yield slightly more consistent results, since the worker threads
+ /// will already be ready to go even in the first iteration. But
+ /// this cost is minimal.
+ ///
+ /// Initialization of the global thread pool happens exactly
+ /// once. Once started, the configuration cannot be
+ /// changed. Therefore, if you call `build_global` a second time, it
+ /// will return an error. An `Ok` result indicates that this
+ /// is the first initialization of the thread pool.
+ pub fn build_global(self) -> Result<(), ThreadPoolBuildError> {
+ let registry = registry::init_global_registry(self)?;
+ registry.wait_until_primed();
+ Ok(())
+ }
+}
+
+impl ThreadPoolBuilder {
+ /// Creates a scoped `ThreadPool` initialized using this configuration.
+ ///
+ /// This is a convenience function for building a pool using [`crossbeam::scope`]
+ /// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
+ /// The threads in this pool will start by calling `wrapper`, which should
+ /// do initialization and continue by calling `ThreadBuilder::run()`.
+ ///
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ ///
+ /// # Examples
+ ///
+ /// A scoped pool may be useful in combination with scoped thread-local variables.
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ ///
+ /// scoped_tls::scoped_thread_local!(static POOL_DATA: Vec<i32>);
+ ///
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// let pool_data = vec![1, 2, 3];
+ ///
+ /// // We haven't assigned any TLS data yet.
+ /// assert!(!POOL_DATA.is_set());
+ ///
+ /// rayon::ThreadPoolBuilder::new()
+ /// .build_scoped(
+ /// // Borrow `pool_data` in TLS for each thread.
+ /// |thread| POOL_DATA.set(&pool_data, || thread.run()),
+ /// // Do some work that needs the TLS data.
+ /// |pool| pool.install(|| assert!(POOL_DATA.is_set())),
+ /// )?;
+ ///
+ /// // Once we've returned, `pool_data` is no longer borrowed.
+ /// drop(pool_data);
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
+ where
+ W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
+ F: FnOnce(&ThreadPool) -> R,
+ {
+ let result = crossbeam_utils::thread::scope(|scope| {
+ let wrapper = &wrapper;
+ let pool = self
+ .spawn_handler(|thread| {
+ let mut builder = scope.builder();
+ if let Some(name) = thread.name() {
+ builder = builder.name(name.to_string());
+ }
+ if let Some(size) = thread.stack_size() {
+ builder = builder.stack_size(size);
+ }
+ builder.spawn(move |_| wrapper(thread))?;
+ Ok(())
+ })
+ .build()?;
+ Ok(with_pool(&pool))
+ });
+
+ match result {
+ Ok(result) => result,
+ Err(err) => unwind::resume_unwinding(err),
+ }
+ }
+}
+
+impl<S> ThreadPoolBuilder<S> {
+ /// Sets a custom function for spawning threads.
+ ///
+ /// Note that the threads will not exit until after the pool is dropped. It
+ /// is up to the caller to wait for thread termination if that is important
+ /// for any invariants. For instance, threads created in [`crossbeam::scope`]
+ /// will be joined before that scope returns, and this will block indefinitely
+ /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
+ /// until the entire process exits!
+ ///
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.7/crossbeam/fn.scope.html
+ ///
+ /// # Examples
+ ///
+ /// A minimal spawn handler just needs to call `run()` from an independent thread.
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// let pool = rayon::ThreadPoolBuilder::new()
+ /// .spawn_handler(|thread| {
+ /// std::thread::spawn(|| thread.run());
+ /// Ok(())
+ /// })
+ /// .build()?;
+ ///
+ /// pool.install(|| println!("Hello from my custom thread!"));
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// The default spawn handler sets the name and stack size if given, and propagates
+ /// any errors from the thread builder.
+ ///
+ /// ```
+ /// # use rayon_core as rayon;
+ /// fn main() -> Result<(), rayon::ThreadPoolBuildError> {
+ /// let pool = rayon::ThreadPoolBuilder::new()
+ /// .spawn_handler(|thread| {
+ /// let mut b = std::thread::Builder::new();
+ /// if let Some(name) = thread.name() {
+ /// b = b.name(name.to_owned());
+ /// }
+ /// if let Some(stack_size) = thread.stack_size() {
+ /// b = b.stack_size(stack_size);
+ /// }
+ /// b.spawn(|| thread.run())?;
+ /// Ok(())
+ /// })
+ /// .build()?;
+ ///
+ /// pool.install(|| println!("Hello from my fully custom thread!"));
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
+ where
+ F: FnMut(ThreadBuilder) -> io::Result<()>,
+ {
+ ThreadPoolBuilder {
+ spawn_handler: CustomSpawn::new(spawn),
+ // ..self
+ num_threads: self.num_threads,
+ panic_handler: self.panic_handler,
+ get_thread_name: self.get_thread_name,
+ stack_size: self.stack_size,
+ start_handler: self.start_handler,
+ exit_handler: self.exit_handler,
+ breadth_first: self.breadth_first,
+ }
+ }
+
+ /// Returns a reference to the current spawn handler.
+ fn get_spawn_handler(&mut self) -> &mut S {
+ &mut self.spawn_handler
+ }
+
+ /// Get the number of threads that will be used for the thread
+ /// pool. See `num_threads()` for more information.
+ fn get_num_threads(&self) -> usize {
+ if self.num_threads > 0 {
+ self.num_threads
+ } else {
+ match env::var("RAYON_NUM_THREADS")
+ .ok()
+ .and_then(|s| usize::from_str(&s).ok())
+ {
+ Some(x) if x > 0 => return x,
+ Some(x) if x == 0 => return num_cpus::get(),
+ _ => {}
+ }
+
+ // Support for deprecated `RAYON_RS_NUM_CPUS`.
+ match env::var("RAYON_RS_NUM_CPUS")
+ .ok()
+ .and_then(|s| usize::from_str(&s).ok())
+ {
+ Some(x) if x > 0 => x,
+ _ => num_cpus::get(),
+ }
+ }
+ }
+
+ /// Get the thread name for the thread with the given index.
+ fn get_thread_name(&mut self, index: usize) -> Option<String> {
+ let f = self.get_thread_name.as_mut()?;
+ Some(f(index))
+ }
+
+ /// Sets a closure which takes a thread index and returns
+ /// the thread's name.
+ pub fn thread_name<F>(mut self, closure: F) -> Self
+ where
+ F: FnMut(usize) -> String + 'static,
+ {
+ self.get_thread_name = Some(Box::new(closure));
+ self
+ }
+
+ /// Sets the number of threads to be used in the rayon threadpool.
+ ///
+ /// If you specify a non-zero number of threads using this
+ /// function, then the resulting thread-pools are guaranteed to
+ /// start at most this number of threads.
+ ///
+ /// If `num_threads` is 0, or you do not call this function, then
+ /// the Rayon runtime will select the number of threads
+ /// automatically. At present, this is based on the
+ /// `RAYON_NUM_THREADS` environment variable (if set),
+ /// or the number of logical CPUs (otherwise).
+ /// In the future, however, the default behavior may
+ /// change to dynamically add or remove threads as needed.
+ ///
+ /// **Future compatibility warning:** Given the default behavior
+ /// may change in the future, if you wish to rely on a fixed
+ /// number of threads, you should use this function to specify
+ /// that number. To reproduce the current default behavior, you
+ /// may wish to use the [`num_cpus`
+ /// crate](https://crates.io/crates/num_cpus) to query the number
+ /// of CPUs dynamically.
+ ///
+ /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
+ /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
+ /// variable. If both variables are specified, `RAYON_NUM_THREADS` will
+ /// be prefered.
+ pub fn num_threads(mut self, num_threads: usize) -> Self {
+ self.num_threads = num_threads;
+ self
+ }
+
+ /// Returns a copy of the current panic handler.
+ fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
+ self.panic_handler.take()
+ }
+
+ /// Normally, whenever Rayon catches a panic, it tries to
+ /// propagate it to someplace sensible, to try and reflect the
+ /// semantics of sequential execution. But in some cases,
+ /// particularly with the `spawn()` APIs, there is no
+ /// obvious place where we should propagate the panic to.
+ /// In that case, this panic handler is invoked.
+ ///
+ /// If no panic handler is set, the default is to abort the
+ /// process, under the principle that panics should not go
+ /// unobserved.
+ ///
+ /// If the panic handler itself panics, this will abort the
+ /// process. To prevent this, wrap the body of your panic handler
+ /// in a call to `std::panic::catch_unwind()`.
+ pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
+ where
+ H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
+ {
+ self.panic_handler = Some(Box::new(panic_handler));
+ self
+ }
+
+ /// Get the stack size of the worker threads
+ fn get_stack_size(&self) -> Option<usize> {
+ self.stack_size
+ }
+
+ /// Sets the stack size of the worker threads
+ pub fn stack_size(mut self, stack_size: usize) -> Self {
+ self.stack_size = Some(stack_size);
+ self
+ }
+
+ /// **(DEPRECATED)** Suggest to worker threads that they execute
+ /// spawned jobs in a "breadth-first" fashion.
+ ///
+ /// Typically, when a worker thread is idle or blocked, it will
+ /// attempt to execute the job from the *top* of its local deque of
+ /// work (i.e., the job most recently spawned). If this flag is set
+ /// to true, however, workers will prefer to execute in a
+ /// *breadth-first* fashion -- that is, they will search for jobs at
+ /// the *bottom* of their local deque. (At present, workers *always*
+ /// steal from the bottom of other worker's deques, regardless of
+ /// the setting of this flag.)
+ ///
+ /// If you think of the tasks as a tree, where a parent task
+ /// spawns its children in the tree, then this flag loosely
+ /// corresponds to doing a breadth-first traversal of the tree,
+ /// whereas the default would be to do a depth-first traversal.
+ ///
+ /// **Note that this is an "execution hint".** Rayon's task
+ /// execution is highly dynamic and the precise order in which
+ /// independent tasks are executed is not intended to be
+ /// guaranteed.
+ ///
+ /// This `breadth_first()` method is now deprecated per [RFC #1],
+ /// and in the future its effect may be removed. Consider using
+ /// [`scope_fifo()`] for a similar effect.
+ ///
+ /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
+ /// [`scope_fifo()`]: fn.scope_fifo.html
+ #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
+ pub fn breadth_first(mut self) -> Self {
+ self.breadth_first = true;
+ self
+ }
+
+ fn get_breadth_first(&self) -> bool {
+ self.breadth_first
+ }
+
+ /// Takes the current thread start callback, leaving `None`.
+ fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
+ self.start_handler.take()
+ }
+
+ /// Sets a callback to be invoked on thread start.
+ ///
+ /// The closure is passed the index of the thread on which it is invoked.
+ /// Note that this same closure may be invoked multiple times in parallel.
+ /// If this closure panics, the panic will be passed to the panic handler.
+ /// If that handler returns, then startup will continue normally.
+ pub fn start_handler<H>(mut self, start_handler: H) -> Self
+ where
+ H: Fn(usize) + Send + Sync + 'static,
+ {
+ self.start_handler = Some(Box::new(start_handler));
+ self
+ }
+
+ /// Returns a current thread exit callback, leaving `None`.
+ fn take_exit_handler(&mut self) -> Option<Box<ExitHandler>> {
+ self.exit_handler.take()
+ }
+
+ /// Sets a callback to be invoked on thread exit.
+ ///
+ /// The closure is passed the index of the thread on which it is invoked.
+ /// Note that this same closure may be invoked multiple times in parallel.
+ /// If this closure panics, the panic will be passed to the panic handler.
+ /// If that handler returns, then the thread will exit normally.
+ pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
+ where
+ H: Fn(usize) + Send + Sync + 'static,
+ {
+ self.exit_handler = Some(Box::new(exit_handler));
+ self
+ }
+}
+
+#[allow(deprecated)]
+impl Configuration {
+ /// Creates and return a valid rayon thread pool configuration, but does not initialize it.
+ pub fn new() -> Configuration {
+ Configuration {
+ builder: ThreadPoolBuilder::new(),
+ }
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::build`.
+ pub fn build(self) -> Result<ThreadPool, Box<dyn Error + 'static>> {
+ self.builder.build().map_err(Box::from)
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::thread_name`.
+ pub fn thread_name<F>(mut self, closure: F) -> Self
+ where
+ F: FnMut(usize) -> String + 'static,
+ {
+ self.builder = self.builder.thread_name(closure);
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::num_threads`.
+ pub fn num_threads(mut self, num_threads: usize) -> Configuration {
+ self.builder = self.builder.num_threads(num_threads);
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::panic_handler`.
+ pub fn panic_handler<H>(mut self, panic_handler: H) -> Configuration
+ where
+ H: Fn(Box<dyn Any + Send>) + Send + Sync + 'static,
+ {
+ self.builder = self.builder.panic_handler(panic_handler);
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::stack_size`.
+ pub fn stack_size(mut self, stack_size: usize) -> Self {
+ self.builder = self.builder.stack_size(stack_size);
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::breadth_first`.
+ pub fn breadth_first(mut self) -> Self {
+ self.builder = self.builder.breadth_first();
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::start_handler`.
+ pub fn start_handler<H>(mut self, start_handler: H) -> Configuration
+ where
+ H: Fn(usize) + Send + Sync + 'static,
+ {
+ self.builder = self.builder.start_handler(start_handler);
+ self
+ }
+
+ /// Deprecated in favor of `ThreadPoolBuilder::exit_handler`.
+ pub fn exit_handler<H>(mut self, exit_handler: H) -> Configuration
+ where
+ H: Fn(usize) + Send + Sync + 'static,
+ {
+ self.builder = self.builder.exit_handler(exit_handler);
+ self
+ }
+
+ /// Returns a ThreadPoolBuilder with identical parameters.
+ fn into_builder(self) -> ThreadPoolBuilder {
+ self.builder
+ }
+}
+
+impl ThreadPoolBuildError {
+ fn new(kind: ErrorKind) -> ThreadPoolBuildError {
+ ThreadPoolBuildError { kind }
+ }
+}
+
+const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
+ "The global thread pool has already been initialized.";
+
+impl Error for ThreadPoolBuildError {
+ #[allow(deprecated)]
+ fn description(&self) -> &str {
+ match self.kind {
+ ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
+ ErrorKind::IOError(ref e) => e.description(),
+ }
+ }
+
+ fn source(&self) -> Option<&(dyn Error + 'static)> {
+ match &self.kind {
+ ErrorKind::GlobalPoolAlreadyInitialized => None,
+ ErrorKind::IOError(e) => Some(e),
+ }
+ }
+}
+
+impl fmt::Display for ThreadPoolBuildError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &self.kind {
+ ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
+ ErrorKind::IOError(e) => e.fmt(f),
+ }
+ }
+}
+
+/// Deprecated in favor of `ThreadPoolBuilder::build_global`.
+#[deprecated(note = "use `ThreadPoolBuilder::build_global`")]
+#[allow(deprecated)]
+pub fn initialize(config: Configuration) -> Result<(), Box<dyn Error>> {
+ config.into_builder().build_global().map_err(Box::from)
+}
+
+impl<S> fmt::Debug for ThreadPoolBuilder<S> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ThreadPoolBuilder {
+ ref num_threads,
+ ref get_thread_name,
+ ref panic_handler,
+ ref stack_size,
+ ref start_handler,
+ ref exit_handler,
+ spawn_handler: _,
+ ref breadth_first,
+ } = *self;
+
+ // Just print `Some(<closure>)` or `None` to the debug
+ // output.
+ struct ClosurePlaceholder;
+ impl fmt::Debug for ClosurePlaceholder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("<closure>")
+ }
+ }
+ let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
+ let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
+ let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
+ let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
+
+ f.debug_struct("ThreadPoolBuilder")
+ .field("num_threads", num_threads)
+ .field("get_thread_name", &get_thread_name)
+ .field("panic_handler", &panic_handler)
+ .field("stack_size", &stack_size)
+ .field("start_handler", &start_handler)
+ .field("exit_handler", &exit_handler)
+ .field("breadth_first", &breadth_first)
+ .finish()
+ }
+}
+
+#[allow(deprecated)]
+impl Default for Configuration {
+ fn default() -> Self {
+ Configuration {
+ builder: Default::default(),
+ }
+ }
+}
+
+#[allow(deprecated)]
+impl fmt::Debug for Configuration {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.builder.fmt(f)
+ }
+}
+
+/// Provides the calling context to a closure called by `join_context`.
+#[derive(Debug)]
+pub struct FnContext {
+ migrated: bool,
+
+ /// disable `Send` and `Sync`, just for a little future-proofing.
+ _marker: PhantomData<*mut ()>,
+}
+
+impl FnContext {
+ #[inline]
+ fn new(migrated: bool) -> Self {
+ FnContext {
+ migrated,
+ _marker: PhantomData,
+ }
+ }
+}
+
+impl FnContext {
+ /// Returns `true` if the closure was called from a different thread
+ /// than it was provided from.
+ #[inline]
+ pub fn migrated(&self) -> bool {
+ self.migrated
+ }
+}
diff --git a/src/log.rs b/src/log.rs
new file mode 100644
index 0000000..e1ff827
--- /dev/null
+++ b/src/log.rs
@@ -0,0 +1,423 @@
+//! Debug Logging
+//!
+//! To use in a debug build, set the env var `RAYON_LOG` as
+//! described below. In a release build, logs are compiled out by
+//! default unless Rayon is built with `--cfg rayon_rs_log` (try
+//! `RUSTFLAGS="--cfg rayon_rs_log"`).
+//!
+//! Note that logs are an internally debugging tool and their format
+//! is considered unstable, as are the details of how to enable them.
+//!
+//! # Valid values for RAYON_LOG
+//!
+//! The `RAYON_LOG` variable can take on the following values:
+//!
+//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
+//! useful for tracking down deadlocks
+//! * `profile:<file>` -- dumps only those events needed to reconstruct how
+//! many workers are active at a given time
+//! * `all:<file>` -- dumps every event to the file; useful for debugging
+
+use crossbeam_channel::{self, Receiver, Sender};
+use std::collections::VecDeque;
+use std::env;
+use std::fs::File;
+use std::io::{self, BufWriter, Write};
+
+/// True if logs are compiled in.
+pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
+
+#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
+pub(super) enum Event {
+ /// Flushes events to disk, used to terminate benchmarking.
+ Flush,
+
+ /// Indicates that a worker thread started execution.
+ ThreadStart {
+ worker: usize,
+ terminate_addr: usize,
+ },
+
+ /// Indicates that a worker thread started execution.
+ ThreadTerminate { worker: usize },
+
+ /// Indicates that a worker thread became idle, blocked on `latch_addr`.
+ ThreadIdle { worker: usize, latch_addr: usize },
+
+ /// Indicates that an idle worker thread found work to do, after
+ /// yield rounds. It should no longer be considered idle.
+ ThreadFoundWork { worker: usize, yields: u32 },
+
+ /// Indicates that a worker blocked on a latch observed that it was set.
+ ///
+ /// Internal debugging event that does not affect the state
+ /// machine.
+ ThreadSawLatchSet { worker: usize, latch_addr: usize },
+
+ /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
+ /// sleep state that we saw at the time.
+ ThreadSleepy { worker: usize, jobs_counter: usize },
+
+ /// Indicates that the thread's attempt to fall asleep was
+ /// interrupted because the latch was set. (This is not, in and of
+ /// itself, a change to the thread state.)
+ ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
+
+ /// Indicates that the thread's attempt to fall asleep was
+ /// interrupted because a job was posted. (This is not, in and of
+ /// itself, a change to the thread state.)
+ ThreadSleepInterruptedByJob { worker: usize },
+
+ /// Indicates that an idle worker has gone to sleep.
+ ThreadSleeping { worker: usize, latch_addr: usize },
+
+ /// Indicates that a sleeping worker has awoken.
+ ThreadAwoken { worker: usize, latch_addr: usize },
+
+ /// Indicates that the given worker thread was notified it should
+ /// awaken.
+ ThreadNotify { worker: usize },
+
+ /// The given worker has pushed a job to its local deque.
+ JobPushed { worker: usize },
+
+ /// The given worker has popped a job from its local deque.
+ JobPopped { worker: usize },
+
+ /// The given worker has stolen a job from the deque of another.
+ JobStolen { worker: usize, victim: usize },
+
+ /// N jobs were injected into the global queue.
+ JobsInjected { count: usize },
+
+ /// A job was removed from the global queue.
+ JobUninjected { worker: usize },
+
+ /// When announcing a job, this was the value of the counters we observed.
+ ///
+ /// No effect on thread state, just a debugging event.
+ JobThreadCounts {
+ worker: usize,
+ num_idle: u16,
+ num_sleepers: u16,
+ },
+}
+
+/// Handle to the logging thread, if any. You can use this to deliver
+/// logs. You can also clone it freely.
+#[derive(Clone)]
+pub(super) struct Logger {
+ sender: Option<Sender<Event>>,
+}
+
+impl Logger {
+ pub(super) fn new(num_workers: usize) -> Logger {
+ if !LOG_ENABLED {
+ return Self::disabled();
+ }
+
+ // see the doc comment for the format
+ let env_log = match env::var("RAYON_LOG") {
+ Ok(s) => s,
+ Err(_) => return Self::disabled(),
+ };
+
+ let (sender, receiver) = crossbeam_channel::unbounded();
+
+ if env_log.starts_with("tail:") {
+ let filename = env_log["tail:".len()..].to_string();
+ ::std::thread::spawn(move || {
+ Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
+ });
+ } else if env_log == "all" {
+ ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
+ } else if env_log.starts_with("profile:") {
+ let filename = env_log["profile:".len()..].to_string();
+ ::std::thread::spawn(move || {
+ Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
+ });
+ } else {
+ panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
+ }
+
+ return Logger {
+ sender: Some(sender),
+ };
+ }
+
+ fn disabled() -> Logger {
+ Logger { sender: None }
+ }
+
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> Event) {
+ if !LOG_ENABLED {
+ return;
+ }
+
+ if let Some(sender) = &self.sender {
+ sender.send(event()).unwrap();
+ }
+ }
+
+ fn profile_logger_thread(
+ num_workers: usize,
+ log_filename: String,
+ capacity: usize,
+ receiver: Receiver<Event>,
+ ) {
+ let file = File::create(&log_filename)
+ .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
+
+ let mut writer = BufWriter::new(file);
+ let mut events = Vec::with_capacity(capacity);
+ let mut state = SimulatorState::new(num_workers);
+ let timeout = std::time::Duration::from_secs(30);
+
+ loop {
+ loop {
+ match receiver.recv_timeout(timeout) {
+ Ok(event) => {
+ if let Event::Flush = event {
+ break;
+ } else {
+ events.push(event);
+ }
+ }
+
+ Err(_) => break,
+ }
+
+ if events.len() == capacity {
+ break;
+ }
+ }
+
+ for event in events.drain(..) {
+ if state.simulate(&event) {
+ state.dump(&mut writer, &event).unwrap();
+ }
+ }
+
+ writer.flush().unwrap();
+ }
+ }
+
+ fn tail_logger_thread(
+ num_workers: usize,
+ log_filename: String,
+ capacity: usize,
+ receiver: Receiver<Event>,
+ ) {
+ let file = File::create(&log_filename)
+ .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
+
+ let mut writer = BufWriter::new(file);
+ let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
+ let mut state = SimulatorState::new(num_workers);
+ let timeout = std::time::Duration::from_secs(30);
+ let mut skipped = false;
+
+ loop {
+ loop {
+ match receiver.recv_timeout(timeout) {
+ Ok(event) => {
+ if let Event::Flush = event {
+ // We ignore Flush events in tail mode --
+ // we're really just looking for
+ // deadlocks.
+ continue;
+ } else {
+ if events.len() == capacity {
+ let event = events.pop_front().unwrap();
+ state.simulate(&event);
+ skipped = true;
+ }
+
+ events.push_back(event);
+ }
+ }
+
+ Err(_) => break,
+ }
+ }
+
+ if skipped {
+ write!(writer, "...\n").unwrap();
+ skipped = false;
+ }
+
+ for event in events.drain(..) {
+ // In tail mode, we dump *all* events out, whether or
+ // not they were 'interesting' to the state machine.
+ state.simulate(&event);
+ state.dump(&mut writer, &event).unwrap();
+ }
+
+ writer.flush().unwrap();
+ }
+ }
+
+ fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
+ let stderr = std::io::stderr();
+ let mut state = SimulatorState::new(num_workers);
+
+ for event in receiver {
+ let mut writer = BufWriter::new(stderr.lock());
+ state.simulate(&event);
+ state.dump(&mut writer, &event).unwrap();
+ writer.flush().unwrap();
+ }
+ }
+}
+
+#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
+enum State {
+ Working,
+ Idle,
+ Notified,
+ Sleeping,
+ Terminated,
+}
+
+impl State {
+ fn letter(&self) -> char {
+ match self {
+ State::Working => 'W',
+ State::Idle => 'I',
+ State::Notified => 'N',
+ State::Sleeping => 'S',
+ State::Terminated => 'T',
+ }
+ }
+}
+
+struct SimulatorState {
+ local_queue_size: Vec<usize>,
+ thread_states: Vec<State>,
+ injector_size: usize,
+}
+
+impl SimulatorState {
+ fn new(num_workers: usize) -> Self {
+ Self {
+ local_queue_size: (0..num_workers).map(|_| 0).collect(),
+ thread_states: (0..num_workers).map(|_| State::Working).collect(),
+ injector_size: 0,
+ }
+ }
+
+ fn simulate(&mut self, event: &Event) -> bool {
+ match *event {
+ Event::ThreadIdle { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Working);
+ self.thread_states[worker] = State::Idle;
+ true
+ }
+
+ Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
+ self.thread_states[worker] = State::Working;
+ true
+ }
+
+ Event::ThreadTerminate { worker, .. } => {
+ self.thread_states[worker] = State::Terminated;
+ true
+ }
+
+ Event::ThreadSleeping { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Idle);
+ self.thread_states[worker] = State::Sleeping;
+ true
+ }
+
+ Event::ThreadAwoken { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Notified);
+ self.thread_states[worker] = State::Idle;
+ true
+ }
+
+ Event::JobPushed { worker } => {
+ self.local_queue_size[worker] += 1;
+ true
+ }
+
+ Event::JobPopped { worker } => {
+ self.local_queue_size[worker] -= 1;
+ true
+ }
+
+ Event::JobStolen { victim, .. } => {
+ self.local_queue_size[victim] -= 1;
+ true
+ }
+
+ Event::JobsInjected { count } => {
+ self.injector_size += count;
+ true
+ }
+
+ Event::JobUninjected { .. } => {
+ self.injector_size -= 1;
+ true
+ }
+
+ Event::ThreadNotify { worker } => {
+ // Currently, this log event occurs while holding the
+ // thread lock, so we should *always* see it before
+ // the worker awakens.
+ assert_eq!(self.thread_states[worker], State::Sleeping);
+ self.thread_states[worker] = State::Notified;
+ true
+ }
+
+ // remaining events are no-ops from pov of simulating the
+ // thread state
+ _ => false,
+ }
+ }
+
+ fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
+ let num_idle_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Idle)
+ .count();
+
+ let num_sleeping_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Sleeping)
+ .count();
+
+ let num_notified_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Notified)
+ .count();
+
+ let num_pending_jobs: usize = self.local_queue_size.iter().sum();
+
+ write!(w, "{:2},", num_idle_threads)?;
+ write!(w, "{:2},", num_sleeping_threads)?;
+ write!(w, "{:2},", num_notified_threads)?;
+ write!(w, "{:4},", num_pending_jobs)?;
+ write!(w, "{:4},", self.injector_size)?;
+
+ let event_str = format!("{:?}", event);
+ write!(w, r#""{:60}","#, event_str)?;
+
+ for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
+ write!(w, " T{:02},{}", i, state.letter(),)?;
+
+ if *queue_size > 0 {
+ write!(w, ",{:03},", queue_size)?;
+ } else {
+ write!(w, ", ,")?;
+ }
+ }
+
+ write!(w, "\n")?;
+ Ok(())
+ }
+}
diff --git a/src/private.rs b/src/private.rs
new file mode 100644
index 0000000..c85e77b
--- /dev/null
+++ b/src/private.rs
@@ -0,0 +1,26 @@
+//! The public parts of this private module are used to create traits
+//! that cannot be implemented outside of our own crate. This way we
+//! can feel free to extend those traits without worrying about it
+//! being a breaking change for other implementations.
+
+/// If this type is pub but not publicly reachable, third parties
+/// can't name it and can't implement traits using it.
+#[allow(missing_debug_implementations)]
+pub struct PrivateMarker;
+
+macro_rules! private_decl {
+ () => {
+ /// This trait is private; this method exists to make it
+ /// impossible to implement outside the crate.
+ #[doc(hidden)]
+ fn __rayon_private__(&self) -> crate::private::PrivateMarker;
+ };
+}
+
+macro_rules! private_impl {
+ () => {
+ fn __rayon_private__(&self) -> crate::private::PrivateMarker {
+ crate::private::PrivateMarker
+ }
+ };
+}
diff --git a/src/registry.rs b/src/registry.rs
new file mode 100644
index 0000000..46ae10a
--- /dev/null
+++ b/src/registry.rs
@@ -0,0 +1,925 @@
+use crate::job::{JobFifo, JobRef, StackJob};
+use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
+use crate::log::Event::*;
+use crate::log::Logger;
+use crate::sleep::Sleep;
+use crate::unwind;
+use crate::util::leak;
+use crate::{
+ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+};
+use crossbeam_deque::{Injector, Steal, Stealer, Worker};
+use std::any::Any;
+use std::cell::Cell;
+use std::collections::hash_map::DefaultHasher;
+use std::fmt;
+use std::hash::Hasher;
+use std::io;
+use std::mem;
+use std::ptr;
+#[allow(deprecated)]
+use std::sync::atomic::ATOMIC_USIZE_INIT;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Once};
+use std::thread;
+use std::usize;
+
+/// Thread builder used for customization via
+/// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
+pub struct ThreadBuilder {
+ name: Option<String>,
+ stack_size: Option<usize>,
+ worker: Worker<JobRef>,
+ registry: Arc<Registry>,
+ index: usize,
+}
+
+impl ThreadBuilder {
+ /// Gets the index of this thread in the pool, within `0..num_threads`.
+ pub fn index(&self) -> usize {
+ self.index
+ }
+
+ /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
+ pub fn name(&self) -> Option<&str> {
+ self.name.as_ref().map(String::as_str)
+ }
+
+ /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
+ pub fn stack_size(&self) -> Option<usize> {
+ self.stack_size
+ }
+
+ /// Executes the main loop for this thread. This will not return until the
+ /// thread pool is dropped.
+ pub fn run(self) {
+ unsafe { main_loop(self.worker, self.registry, self.index) }
+ }
+}
+
+impl fmt::Debug for ThreadBuilder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadBuilder")
+ .field("pool", &self.registry.id())
+ .field("index", &self.index)
+ .field("name", &self.name)
+ .field("stack_size", &self.stack_size)
+ .finish()
+ }
+}
+
+/// Generalized trait for spawning a thread in the `Registry`.
+///
+/// This trait is pub-in-private -- E0445 forces us to make it public,
+/// but we don't actually want to expose these details in the API.
+pub trait ThreadSpawn {
+ private_decl! {}
+
+ /// Spawn a thread with the `ThreadBuilder` parameters, and then
+ /// call `ThreadBuilder::run()`.
+ fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
+}
+
+/// Spawns a thread in the "normal" way with `std::thread::Builder`.
+///
+/// This type is pub-in-private -- E0445 forces us to make it public,
+/// but we don't actually want to expose these details in the API.
+#[derive(Debug, Default)]
+pub struct DefaultSpawn;
+
+impl ThreadSpawn for DefaultSpawn {
+ private_impl! {}
+
+ fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
+ let mut b = thread::Builder::new();
+ if let Some(name) = thread.name() {
+ b = b.name(name.to_owned());
+ }
+ if let Some(stack_size) = thread.stack_size() {
+ b = b.stack_size(stack_size);
+ }
+ b.spawn(|| thread.run())?;
+ Ok(())
+ }
+}
+
+/// Spawns a thread with a user's custom callback.
+///
+/// This type is pub-in-private -- E0445 forces us to make it public,
+/// but we don't actually want to expose these details in the API.
+#[derive(Debug)]
+pub struct CustomSpawn<F>(F);
+
+impl<F> CustomSpawn<F>
+where
+ F: FnMut(ThreadBuilder) -> io::Result<()>,
+{
+ pub(super) fn new(spawn: F) -> Self {
+ CustomSpawn(spawn)
+ }
+}
+
+impl<F> ThreadSpawn for CustomSpawn<F>
+where
+ F: FnMut(ThreadBuilder) -> io::Result<()>,
+{
+ private_impl! {}
+
+ #[inline]
+ fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
+ (self.0)(thread)
+ }
+}
+
+pub(super) struct Registry {
+ logger: Logger,
+ thread_infos: Vec<ThreadInfo>,
+ sleep: Sleep,
+ injected_jobs: Injector<JobRef>,
+ panic_handler: Option<Box<PanicHandler>>,
+ start_handler: Option<Box<StartHandler>>,
+ exit_handler: Option<Box<ExitHandler>>,
+
+ // When this latch reaches 0, it means that all work on this
+ // registry must be complete. This is ensured in the following ways:
+ //
+ // - if this is the global registry, there is a ref-count that never
+ // gets released.
+ // - if this is a user-created thread-pool, then so long as the thread-pool
+ // exists, it holds a reference.
+ // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
+ // no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
+ // return until the blocking job is complete, that ref will continue to be held.
+ // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
+ // These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
+ // and that job will keep the pool alive.
+ terminate_count: AtomicUsize,
+}
+
+/// ////////////////////////////////////////////////////////////////////////
+/// Initialization
+
+static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
+static THE_REGISTRY_SET: Once = Once::new();
+
+/// Starts the worker threads (if that has not already happened). If
+/// initialization has not already occurred, use the default
+/// configuration.
+fn global_registry() -> &'static Arc<Registry> {
+ set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
+ .expect("The global thread pool has not been initialized.")
+}
+
+/// Starts the worker threads (if that has not already happened) with
+/// the given builder.
+pub(super) fn init_global_registry<S>(
+ builder: ThreadPoolBuilder<S>,
+) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
+where
+ S: ThreadSpawn,
+{
+ set_global_registry(|| Registry::new(builder))
+}
+
+/// Starts the worker threads (if that has not already happened)
+/// by creating a registry with the given callback.
+fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
+where
+ F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
+{
+ let mut result = Err(ThreadPoolBuildError::new(
+ ErrorKind::GlobalPoolAlreadyInitialized,
+ ));
+ THE_REGISTRY_SET.call_once(|| {
+ result = registry().map(|registry| {
+ let registry = leak(registry);
+ unsafe {
+ THE_REGISTRY = Some(registry);
+ }
+ registry
+ });
+ });
+ result
+}
+
+struct Terminator<'a>(&'a Arc<Registry>);
+
+impl<'a> Drop for Terminator<'a> {
+ fn drop(&mut self) {
+ self.0.terminate()
+ }
+}
+
+impl Registry {
+ pub(super) fn new<S>(
+ mut builder: ThreadPoolBuilder<S>,
+ ) -> Result<Arc<Self>, ThreadPoolBuildError>
+ where
+ S: ThreadSpawn,
+ {
+ let n_threads = builder.get_num_threads();
+ let breadth_first = builder.get_breadth_first();
+
+ let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
+ .map(|_| {
+ let worker = if breadth_first {
+ Worker::new_fifo()
+ } else {
+ Worker::new_lifo()
+ };
+
+ let stealer = worker.stealer();
+ (worker, stealer)
+ })
+ .unzip();
+
+ let logger = Logger::new(n_threads);
+ let registry = Arc::new(Registry {
+ logger: logger.clone(),
+ thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
+ sleep: Sleep::new(logger, n_threads),
+ injected_jobs: Injector::new(),
+ terminate_count: AtomicUsize::new(1),
+ panic_handler: builder.take_panic_handler(),
+ start_handler: builder.take_start_handler(),
+ exit_handler: builder.take_exit_handler(),
+ });
+
+ // If we return early or panic, make sure to terminate existing threads.
+ let t1000 = Terminator(&registry);
+
+ for (index, worker) in workers.into_iter().enumerate() {
+ let thread = ThreadBuilder {
+ name: builder.get_thread_name(index),
+ stack_size: builder.get_stack_size(),
+ registry: registry.clone(),
+ worker,
+ index,
+ };
+ if let Err(e) = builder.get_spawn_handler().spawn(thread) {
+ return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
+ }
+ }
+
+ // Returning normally now, without termination.
+ mem::forget(t1000);
+
+ Ok(registry.clone())
+ }
+
+ pub(super) fn current() -> Arc<Registry> {
+ unsafe {
+ let worker_thread = WorkerThread::current();
+ if worker_thread.is_null() {
+ global_registry().clone()
+ } else {
+ (*worker_thread).registry.clone()
+ }
+ }
+ }
+
+ /// Returns the number of threads in the current registry. This
+ /// is better than `Registry::current().num_threads()` because it
+ /// avoids incrementing the `Arc`.
+ pub(super) fn current_num_threads() -> usize {
+ unsafe {
+ let worker_thread = WorkerThread::current();
+ if worker_thread.is_null() {
+ global_registry().num_threads()
+ } else {
+ (*worker_thread).registry.num_threads()
+ }
+ }
+ }
+
+ /// Returns the current `WorkerThread` if it's part of this `Registry`.
+ pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
+ unsafe {
+ let worker = WorkerThread::current().as_ref()?;
+ if worker.registry().id() == self.id() {
+ Some(worker)
+ } else {
+ None
+ }
+ }
+ }
+
+ /// Returns an opaque identifier for this registry.
+ pub(super) fn id(&self) -> RegistryId {
+ // We can rely on `self` not to change since we only ever create
+ // registries that are boxed up in an `Arc` (see `new()` above).
+ RegistryId {
+ addr: self as *const Self as usize,
+ }
+ }
+
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+ self.logger.log(event)
+ }
+
+ pub(super) fn num_threads(&self) -> usize {
+ self.thread_infos.len()
+ }
+
+ pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
+ match self.panic_handler {
+ Some(ref handler) => {
+ // If the customizable panic handler itself panics,
+ // then we abort.
+ let abort_guard = unwind::AbortIfPanic;
+ handler(err);
+ mem::forget(abort_guard);
+ }
+ None => {
+ // Default panic handler aborts.
+ let _ = unwind::AbortIfPanic; // let this drop.
+ }
+ }
+ }
+
+ /// Waits for the worker threads to get up and running. This is
+ /// meant to be used for benchmarking purposes, primarily, so that
+ /// you can get more consistent numbers by having everything
+ /// "ready to go".
+ pub(super) fn wait_until_primed(&self) {
+ for info in &self.thread_infos {
+ info.primed.wait();
+ }
+ }
+
+ /// Waits for the worker threads to stop. This is used for testing
+ /// -- so we can check that termination actually works.
+ #[cfg(test)]
+ pub(super) fn wait_until_stopped(&self) {
+ for info in &self.thread_infos {
+ info.stopped.wait();
+ }
+ }
+
+ /// ////////////////////////////////////////////////////////////////////////
+ /// MAIN LOOP
+ ///
+ /// So long as all of the worker threads are hanging out in their
+ /// top-level loop, there is no work to be done.
+
+ /// Push a job into the given `registry`. If we are running on a
+ /// worker thread for the registry, this will push onto the
+ /// deque. Else, it will inject from the outside (which is slower).
+ pub(super) fn inject_or_push(&self, job_ref: JobRef) {
+ let worker_thread = WorkerThread::current();
+ unsafe {
+ if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
+ (*worker_thread).push(job_ref);
+ } else {
+ self.inject(&[job_ref]);
+ }
+ }
+ }
+
+ /// Push a job into the "external jobs" queue; it will be taken by
+ /// whatever worker has nothing to do. Use this is you know that
+ /// you are not on a worker of this registry.
+ pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
+ self.log(|| JobsInjected {
+ count: injected_jobs.len(),
+ });
+
+ // It should not be possible for `state.terminate` to be true
+ // here. It is only set to true when the user creates (and
+ // drops) a `ThreadPool`; and, in that case, they cannot be
+ // calling `inject()` later, since they dropped their
+ // `ThreadPool`.
+ debug_assert_ne!(
+ self.terminate_count.load(Ordering::Acquire),
+ 0,
+ "inject() sees state.terminate as true"
+ );
+
+ let queue_was_empty = self.injected_jobs.is_empty();
+
+ for &job_ref in injected_jobs {
+ self.injected_jobs.push(job_ref);
+ }
+
+ self.sleep
+ .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ }
+
+ fn has_injected_job(&self) -> bool {
+ !self.injected_jobs.is_empty()
+ }
+
+ fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
+ loop {
+ match self.injected_jobs.steal() {
+ Steal::Success(job) => {
+ self.log(|| JobUninjected {
+ worker: worker_index,
+ });
+ return Some(job);
+ }
+ Steal::Empty => return None,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ /// If already in a worker-thread of this registry, just execute `op`.
+ /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
+ /// completes and return its return value. If `op` panics, that panic will
+ /// be propagated as well. The second argument indicates `true` if injection
+ /// was performed, `false` if executed directly.
+ pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&WorkerThread, bool) -> R + Send,
+ R: Send,
+ {
+ unsafe {
+ let worker_thread = WorkerThread::current();
+ if worker_thread.is_null() {
+ self.in_worker_cold(op)
+ } else if (*worker_thread).registry().id() != self.id() {
+ self.in_worker_cross(&*worker_thread, op)
+ } else {
+ // Perfectly valid to give them a `&T`: this is the
+ // current thread, so we know the data structure won't be
+ // invalidated until we return.
+ op(&*worker_thread, false)
+ }
+ }
+ }
+
+ #[cold]
+ unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&WorkerThread, bool) -> R + Send,
+ R: Send,
+ {
+ thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
+
+ LOCK_LATCH.with(|l| {
+ // This thread isn't a member of *any* thread pool, so just block.
+ debug_assert!(WorkerThread::current().is_null());
+ let job = StackJob::new(
+ |injected| {
+ let worker_thread = WorkerThread::current();
+ assert!(injected && !worker_thread.is_null());
+ op(&*worker_thread, true)
+ },
+ l,
+ );
+ self.inject(&[job.as_job_ref()]);
+ job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
+
+ // flush accumulated logs as we exit the thread
+ self.logger.log(|| Flush);
+
+ job.into_result()
+ })
+ }
+
+ #[cold]
+ unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
+ where
+ OP: FnOnce(&WorkerThread, bool) -> R + Send,
+ R: Send,
+ {
+ // This thread is a member of a different pool, so let it process
+ // other work while waiting for this `op` to complete.
+ debug_assert!(current_thread.registry().id() != self.id());
+ let latch = SpinLatch::cross(current_thread);
+ let job = StackJob::new(
+ |injected| {
+ let worker_thread = WorkerThread::current();
+ assert!(injected && !worker_thread.is_null());
+ op(&*worker_thread, true)
+ },
+ latch,
+ );
+ self.inject(&[job.as_job_ref()]);
+ current_thread.wait_until(&job.latch);
+ job.into_result()
+ }
+
+ /// Increments the terminate counter. This increment should be
+ /// balanced by a call to `terminate`, which will decrement. This
+ /// is used when spawning asynchronous work, which needs to
+ /// prevent the registry from terminating so long as it is active.
+ ///
+ /// Note that blocking functions such as `join` and `scope` do not
+ /// need to concern themselves with this fn; their context is
+ /// responsible for ensuring the current thread-pool will not
+ /// terminate until they return.
+ ///
+ /// The global thread-pool always has an outstanding reference
+ /// (the initial one). Custom thread-pools have one outstanding
+ /// reference that is dropped when the `ThreadPool` is dropped:
+ /// since installing the thread-pool blocks until any joins/scopes
+ /// complete, this ensures that joins/scopes are covered.
+ ///
+ /// The exception is `::spawn()`, which can create a job outside
+ /// of any blocking scope. In that case, the job itself holds a
+ /// terminate count and is responsible for invoking `terminate()`
+ /// when finished.
+ pub(super) fn increment_terminate_count(&self) {
+ let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
+ debug_assert!(previous != 0, "registry ref count incremented from zero");
+ assert!(
+ previous != std::usize::MAX,
+ "overflow in registry ref count"
+ );
+ }
+
+ /// Signals that the thread-pool which owns this registry has been
+ /// dropped. The worker threads will gradually terminate, once any
+ /// extant work is completed.
+ pub(super) fn terminate(&self) {
+ if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
+ for (i, thread_info) in self.thread_infos.iter().enumerate() {
+ thread_info.terminate.set_and_tickle_one(self, i);
+ }
+ }
+ }
+
+ /// Notify the worker that the latch they are sleeping on has been "set".
+ pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+ self.sleep.notify_worker_latch_is_set(target_worker_index);
+ }
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub(super) struct RegistryId {
+ addr: usize,
+}
+
+struct ThreadInfo {
+ /// Latch set once thread has started and we are entering into the
+ /// main loop. Used to wait for worker threads to become primed,
+ /// primarily of interest for benchmarking.
+ primed: LockLatch,
+
+ /// Latch is set once worker thread has completed. Used to wait
+ /// until workers have stopped; only used for tests.
+ stopped: LockLatch,
+
+ /// The latch used to signal that terminated has been requested.
+ /// This latch is *set* by the `terminate` method on the
+ /// `Registry`, once the registry's main "terminate" counter
+ /// reaches zero.
+ ///
+ /// NB. We use a `CountLatch` here because it has no lifetimes and is
+ /// meant for async use, but the count never gets higher than one.
+ terminate: CountLatch,
+
+ /// the "stealer" half of the worker's deque
+ stealer: Stealer<JobRef>,
+}
+
+impl ThreadInfo {
+ fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
+ ThreadInfo {
+ primed: LockLatch::new(),
+ stopped: LockLatch::new(),
+ terminate: CountLatch::new(),
+ stealer,
+ }
+ }
+}
+
+/// ////////////////////////////////////////////////////////////////////////
+/// WorkerThread identifiers
+
+pub(super) struct WorkerThread {
+ /// the "worker" half of our local deque
+ worker: Worker<JobRef>,
+
+ /// local queue used for `spawn_fifo` indirection
+ fifo: JobFifo,
+
+ index: usize,
+
+ /// A weak random number generator.
+ rng: XorShift64Star,
+
+ registry: Arc<Registry>,
+}
+
+// This is a bit sketchy, but basically: the WorkerThread is
+// allocated on the stack of the worker on entry and stored into this
+// thread local variable. So it will remain valid at least until the
+// worker is fully unwound. Using an unsafe pointer avoids the need
+// for a RefCell<T> etc.
+thread_local! {
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+}
+
+impl Drop for WorkerThread {
+ fn drop(&mut self) {
+ // Undo `set_current`
+ WORKER_THREAD_STATE.with(|t| {
+ assert!(t.get().eq(&(self as *const _)));
+ t.set(ptr::null());
+ });
+ }
+}
+
+impl WorkerThread {
+ /// Gets the `WorkerThread` index for the current thread; returns
+ /// NULL if this is not a worker thread. This pointer is valid
+ /// anywhere on the current thread.
+ #[inline]
+ pub(super) fn current() -> *const WorkerThread {
+ WORKER_THREAD_STATE.with(Cell::get)
+ }
+
+ /// Sets `self` as the worker thread index for the current thread.
+ /// This is done during worker thread startup.
+ unsafe fn set_current(thread: *const WorkerThread) {
+ WORKER_THREAD_STATE.with(|t| {
+ assert!(t.get().is_null());
+ t.set(thread);
+ });
+ }
+
+ /// Returns the registry that owns this worker thread.
+ #[inline]
+ pub(super) fn registry(&self) -> &Arc<Registry> {
+ &self.registry
+ }
+
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
+ self.registry.logger.log(event)
+ }
+
+ /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
+ #[inline]
+ pub(super) fn index(&self) -> usize {
+ self.index
+ }
+
+ #[inline]
+ pub(super) unsafe fn push(&self, job: JobRef) {
+ self.log(|| JobPushed { worker: self.index });
+ let queue_was_empty = self.worker.is_empty();
+ self.worker.push(job);
+ self.registry
+ .sleep
+ .new_internal_jobs(self.index, 1, queue_was_empty);
+ }
+
+ #[inline]
+ pub(super) unsafe fn push_fifo(&self, job: JobRef) {
+ self.push(self.fifo.push(job));
+ }
+
+ #[inline]
+ pub(super) fn local_deque_is_empty(&self) -> bool {
+ self.worker.is_empty()
+ }
+
+ /// Attempts to obtain a "local" job -- typically this means
+ /// popping from the top of the stack, though if we are configured
+ /// for breadth-first execution, it would mean dequeuing from the
+ /// bottom.
+ #[inline]
+ pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ let popped_job = self.worker.pop();
+
+ if popped_job.is_some() {
+ self.log(|| JobPopped { worker: self.index });
+ }
+
+ popped_job
+ }
+
+ /// Wait until the latch is set. Try to keep busy by popping and
+ /// stealing tasks as necessary.
+ #[inline]
+ pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
+ let latch = latch.as_core_latch();
+ if !latch.probe() {
+ self.wait_until_cold(latch);
+ }
+ }
+
+ #[cold]
+ unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
+ // the code below should swallow all panics and hence never
+ // unwind; but if something does wrong, we want to abort,
+ // because otherwise other code in rayon may assume that the
+ // latch has been signaled, and that can lead to random memory
+ // accesses, which would be *very bad*
+ let abort_guard = unwind::AbortIfPanic;
+
+ let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
+ while !latch.probe() {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ if let Some(job) = self
+ .take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ {
+ self.registry.sleep.work_found(idle_state);
+ self.execute(job);
+ idle_state = self.registry.sleep.start_looking(self.index, latch);
+ } else {
+ self.registry
+ .sleep
+ .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
+ }
+ }
+
+ // If we were sleepy, we are not anymore. We "found work" --
+ // whatever the surrounding thread was doing before it had to
+ // wait.
+ self.registry.sleep.work_found(idle_state);
+
+ self.log(|| ThreadSawLatchSet {
+ worker: self.index,
+ latch_addr: latch.addr(),
+ });
+ mem::forget(abort_guard); // successful execution, do not abort
+ }
+
+ #[inline]
+ pub(super) unsafe fn execute(&self, job: JobRef) {
+ job.execute();
+ }
+
+ /// Try to steal a single job and return it.
+ ///
+ /// This should only be done as a last resort, when there is no
+ /// local work to do.
+ unsafe fn steal(&self) -> Option<JobRef> {
+ // we only steal when we don't have any work to do locally
+ debug_assert!(self.local_deque_is_empty());
+
+ // otherwise, try to steal
+ let thread_infos = &self.registry.thread_infos.as_slice();
+ let num_threads = thread_infos.len();
+ if num_threads <= 1 {
+ return None;
+ }
+
+ loop {
+ let mut retry = false;
+ let start = self.rng.next_usize(num_threads);
+ let job = (start..num_threads)
+ .chain(0..start)
+ .filter(move |&i| i != self.index)
+ .find_map(|victim_index| {
+ let victim = &thread_infos[victim_index];
+ match victim.stealer.steal() {
+ Steal::Success(job) => {
+ self.log(|| JobStolen {
+ worker: self.index,
+ victim: victim_index,
+ });
+ Some(job)
+ }
+ Steal::Empty => None,
+ Steal::Retry => {
+ retry = true;
+ None
+ }
+ }
+ });
+ if job.is_some() || !retry {
+ return job;
+ }
+ }
+ }
+}
+
+/// ////////////////////////////////////////////////////////////////////////
+
+unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
+ let worker_thread = &WorkerThread {
+ worker,
+ fifo: JobFifo::new(),
+ index,
+ rng: XorShift64Star::new(),
+ registry: registry.clone(),
+ };
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ registry.thread_infos[index].primed.set();
+
+ // Worker threads should not panic. If they do, just abort, as the
+ // internal state of the threadpool is corrupted. Note that if
+ // **user code** panics, we should catch that and redirect.
+ let abort_guard = unwind::AbortIfPanic;
+
+ // Inform a user callback that we started a thread.
+ if let Some(ref handler) = registry.start_handler {
+ let registry = registry.clone();
+ match unwind::halt_unwinding(|| handler(index)) {
+ Ok(()) => {}
+ Err(err) => {
+ registry.handle_panic(err);
+ }
+ }
+ }
+
+ let my_terminate_latch = &registry.thread_infos[index].terminate;
+ worker_thread.log(|| ThreadStart {
+ worker: index,
+ terminate_addr: my_terminate_latch.as_core_latch().addr(),
+ });
+ worker_thread.wait_until(my_terminate_latch);
+
+ // Should not be any work left in our queue.
+ debug_assert!(worker_thread.take_local_job().is_none());
+
+ // let registry know we are done
+ registry.thread_infos[index].stopped.set();
+
+ // Normal termination, do not abort.
+ mem::forget(abort_guard);
+
+ worker_thread.log(|| ThreadTerminate { worker: index });
+
+ // Inform a user callback that we exited a thread.
+ if let Some(ref handler) = registry.exit_handler {
+ let registry = registry.clone();
+ match unwind::halt_unwinding(|| handler(index)) {
+ Ok(()) => {}
+ Err(err) => {
+ registry.handle_panic(err);
+ }
+ }
+ // We're already exiting the thread, there's nothing else to do.
+ }
+}
+
+/// If already in a worker-thread, just execute `op`. Otherwise,
+/// execute `op` in the default thread-pool. Either way, block until
+/// `op` completes and return its return value. If `op` panics, that
+/// panic will be propagated as well. The second argument indicates
+/// `true` if injection was performed, `false` if executed directly.
+pub(super) fn in_worker<OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&WorkerThread, bool) -> R + Send,
+ R: Send,
+{
+ unsafe {
+ let owner_thread = WorkerThread::current();
+ if !owner_thread.is_null() {
+ // Perfectly valid to give them a `&T`: this is the
+ // current thread, so we know the data structure won't be
+ // invalidated until we return.
+ op(&*owner_thread, false)
+ } else {
+ global_registry().in_worker_cold(op)
+ }
+ }
+}
+
+/// [xorshift*] is a fast pseudorandom number generator which will
+/// even tolerate weak seeding, as long as it's not zero.
+///
+/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
+struct XorShift64Star {
+ state: Cell<u64>,
+}
+
+impl XorShift64Star {
+ fn new() -> Self {
+ // Any non-zero seed will do -- this uses the hash of a global counter.
+ let mut seed = 0;
+ while seed == 0 {
+ let mut hasher = DefaultHasher::new();
+ #[allow(deprecated)]
+ static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
+ hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
+ seed = hasher.finish();
+ }
+
+ XorShift64Star {
+ state: Cell::new(seed),
+ }
+ }
+
+ fn next(&self) -> u64 {
+ let mut x = self.state.get();
+ debug_assert_ne!(x, 0);
+ x ^= x >> 12;
+ x ^= x << 25;
+ x ^= x >> 27;
+ self.state.set(x);
+ x.wrapping_mul(0x2545_f491_4f6c_dd1d)
+ }
+
+ /// Return a value from `0..n`.
+ fn next_usize(&self, n: usize) -> usize {
+ (self.next() % n as u64) as usize
+ }
+}
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
new file mode 100644
index 0000000..a41d408
--- /dev/null
+++ b/src/scope/mod.rs
@@ -0,0 +1,630 @@
+//! Methods for custom fork-join scopes, created by the [`scope()`]
+//! function. These are a more flexible alternative to [`join()`].
+//!
+//! [`scope()`]: fn.scope.html
+//! [`join()`]: ../join/join.fn.html
+
+use crate::job::{HeapJob, JobFifo};
+use crate::latch::CountLatch;
+use crate::registry::{in_worker, Registry, WorkerThread};
+use crate::unwind;
+use std::any::Any;
+use std::fmt;
+use std::marker::PhantomData;
+use std::mem;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+use std::sync::Arc;
+
+#[cfg(test)]
+mod test;
+
+/// Represents a fork-join scope which can be used to spawn any number of tasks.
+/// See [`scope()`] for more information.
+///
+///[`scope()`]: fn.scope.html
+pub struct Scope<'scope> {
+ base: ScopeBase<'scope>,
+}
+
+/// Represents a fork-join scope which can be used to spawn any number of tasks.
+/// Those spawned from the same thread are prioritized in relative FIFO order.
+/// See [`scope_fifo()`] for more information.
+///
+///[`scope_fifo()`]: fn.scope_fifo.html
+pub struct ScopeFifo<'scope> {
+ base: ScopeBase<'scope>,
+ fifos: Vec<JobFifo>,
+}
+
+struct ScopeBase<'scope> {
+ /// thread where `scope()` was executed (note that individual jobs
+ /// may be executing on different worker threads, though they
+ /// should always be within the same pool of threads)
+ owner_thread_index: usize,
+
+ /// thread registry where `scope()` was executed.
+ registry: Arc<Registry>,
+
+ /// if some job panicked, the error is stored here; it will be
+ /// propagated to the one who created the scope
+ panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
+
+ /// latch to set when the counter drops to zero (and hence this scope is complete)
+ job_completed_latch: CountLatch,
+
+ /// You can think of a scope as containing a list of closures to execute,
+ /// all of which outlive `'scope`. They're not actually required to be
+ /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
+ /// the closures are only *moved* across threads to be executed.
+ marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
+}
+
+/// Creates a "fork-join" scope `s` and invokes the closure with a
+/// reference to `s`. This closure can then spawn asynchronous tasks
+/// into `s`. Those tasks may run asynchronously with respect to the
+/// closure; they may themselves spawn additional tasks into `s`. When
+/// the closure returns, it will block until all tasks that have been
+/// spawned into `s` complete.
+///
+/// `scope()` is a more flexible building block compared to `join()`,
+/// since a loop can be used to spawn any number of tasks without
+/// recursing. However, that flexibility comes at a performance price:
+/// tasks spawned using `scope()` must be allocated onto the heap,
+/// whereas `join()` can make exclusive use of the stack. **Prefer
+/// `join()` (or, even better, parallel iterators) where possible.**
+///
+/// # Example
+///
+/// The Rayon `join()` function launches two closures and waits for them
+/// to stop. One could implement `join()` using a scope like so, although
+/// it would be less efficient than the real implementation:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
+/// where A: FnOnce() -> RA + Send,
+/// B: FnOnce() -> RB + Send,
+/// RA: Send,
+/// RB: Send,
+/// {
+/// let mut result_a: Option<RA> = None;
+/// let mut result_b: Option<RB> = None;
+/// rayon::scope(|s| {
+/// s.spawn(|_| result_a = Some(oper_a()));
+/// s.spawn(|_| result_b = Some(oper_b()));
+/// });
+/// (result_a.unwrap(), result_b.unwrap())
+/// }
+/// ```
+///
+/// # A note on threading
+///
+/// The closure given to `scope()` executes in the Rayon thread-pool,
+/// as do those given to `spawn()`. This means that you can't access
+/// thread-local variables (well, you can, but they may have
+/// unexpected values).
+///
+/// # Task execution
+///
+/// Task execution potentially starts as soon as `spawn()` is called.
+/// The task will end sometime before `scope()` returns. Note that the
+/// *closure* given to scope may return much earlier. In general
+/// the lifetime of a scope created like `scope(body) goes something like this:
+///
+/// - Scope begins when `scope(body)` is called
+/// - Scope body `body()` is invoked
+/// - Scope tasks may be spawned
+/// - Scope body returns
+/// - Scope tasks execute, possibly spawning more tasks
+/// - Once all tasks are done, scope ends and `scope()` returns
+///
+/// To see how and when tasks are joined, consider this example:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// // point start
+/// rayon::scope(|s| {
+/// s.spawn(|s| { // task s.1
+/// s.spawn(|s| { // task s.1.1
+/// rayon::scope(|t| {
+/// t.spawn(|_| ()); // task t.1
+/// t.spawn(|_| ()); // task t.2
+/// });
+/// });
+/// });
+/// s.spawn(|s| { // task s.2
+/// });
+/// // point mid
+/// });
+/// // point end
+/// ```
+///
+/// The various tasks that are run will execute roughly like so:
+///
+/// ```notrust
+/// | (start)
+/// |
+/// | (scope `s` created)
+/// +-----------------------------------------------+ (task s.2)
+/// +-------+ (task s.1) |
+/// | | |
+/// | +---+ (task s.1.1) |
+/// | | | |
+/// | | | (scope `t` created) |
+/// | | +----------------+ (task t.2) |
+/// | | +---+ (task t.1) | |
+/// | (mid) | | | | |
+/// : | + <-+------------+ (scope `t` ends) |
+/// : | | |
+/// |<------+---+-----------------------------------+ (scope `s` ends)
+/// |
+/// | (end)
+/// ```
+///
+/// The point here is that everything spawned into scope `s` will
+/// terminate (at latest) at the same point -- right before the
+/// original call to `rayon::scope` returns. This includes new
+/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
+/// scope is created (such as `t`), the things spawned into that scope
+/// will be joined before that scope returns, which in turn occurs
+/// before the creating task (task `s.1.1` in this case) finishes.
+///
+/// There is no guaranteed order of execution for spawns in a scope,
+/// given that other threads may steal tasks at any time. However, they
+/// are generally prioritized in a LIFO order on the thread from which
+/// they were spawned. So in this example, absent any stealing, we can
+/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
+/// threads always steal from the other end of the deque, like FIFO
+/// order. The idea is that "recent" tasks are most likely to be fresh
+/// in the local CPU's cache, while other threads can steal older
+/// "stale" tasks. For an alternate approach, consider
+/// [`scope_fifo()`] instead.
+///
+/// [`scope_fifo()`]: fn.scope_fifo.html
+///
+/// # Accessing stack data
+///
+/// In general, spawned tasks may access stack data in place that
+/// outlives the scope itself. Other data must be fully owned by the
+/// spawned task.
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(|_| {
+/// // We can access `ok` because outlives the scope `s`.
+/// println!("ok: {:?}", ok);
+///
+/// // If we just try to use `bad` here, the closure will borrow `bad`
+/// // (because we are just printing it out, and that only requires a
+/// // borrow), which will result in a compilation error. Read on
+/// // for options.
+/// // println!("bad: {:?}", bad);
+/// });
+/// });
+/// ```
+///
+/// As the comments example above suggest, to reference `bad` we must
+/// take ownership of it. One way to do this is to detach the closure
+/// from the surrounding stack frame, using the `move` keyword. This
+/// will cause it to take ownership of *all* the variables it touches,
+/// in this case including both `ok` *and* `bad`:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(move |_| {
+/// println!("ok: {:?}", ok);
+/// println!("bad: {:?}", bad);
+/// });
+///
+/// // That closure is fine, but now we can't use `ok` anywhere else,
+/// // since it is owend by the previous task:
+/// // s.spawn(|_| println!("ok: {:?}", ok));
+/// });
+/// ```
+///
+/// While this works, it could be a problem if we want to use `ok` elsewhere.
+/// There are two choices. We can keep the closure as a `move` closure, but
+/// instead of referencing the variable `ok`, we create a shadowed variable that
+/// is a borrow of `ok` and capture *that*:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
+/// s.spawn(move |_| {
+/// println!("ok: {:?}", ok); // captures the shadowed version
+/// println!("bad: {:?}", bad);
+/// });
+///
+/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
+/// // can be shared freely. Note that we need a `move` closure here though,
+/// // because otherwise we'd be trying to borrow the shadowed `ok`,
+/// // and that doesn't outlive `scope`.
+/// s.spawn(move |_| println!("ok: {:?}", ok));
+/// });
+/// ```
+///
+/// Another option is not to use the `move` keyword but instead to take ownership
+/// of individual variables:
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let ok: Vec<i32> = vec![1, 2, 3];
+/// rayon::scope(|s| {
+/// let bad: Vec<i32> = vec![4, 5, 6];
+/// s.spawn(|_| {
+/// // Transfer ownership of `bad` into a local variable (also named `bad`).
+/// // This will force the closure to take ownership of `bad` from the environment.
+/// let bad = bad;
+/// println!("ok: {:?}", ok); // `ok` is only borrowed.
+/// println!("bad: {:?}", bad); // refers to our local variable, above.
+/// });
+///
+/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
+/// });
+/// ```
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `scope()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `scope()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn()`, it will
+/// execute, even if the spawning task should later panic. `scope()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn scope<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R + Send,
+ R: Send,
+{
+ in_worker(|owner_thread, _| {
+ let scope = Scope::<'scope>::new(owner_thread);
+ unsafe { scope.base.complete(owner_thread, || op(&scope)) }
+ })
+}
+
+/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
+/// closure with a reference to `s`. This closure can then spawn
+/// asynchronous tasks into `s`. Those tasks may run asynchronously with
+/// respect to the closure; they may themselves spawn additional tasks
+/// into `s`. When the closure returns, it will block until all tasks
+/// that have been spawned into `s` complete.
+///
+/// # Task execution
+///
+/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
+/// difference in the order of execution. Consider a similar example:
+///
+/// [`scope()`]: fn.scope.html
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// // point start
+/// rayon::scope_fifo(|s| {
+/// s.spawn_fifo(|s| { // task s.1
+/// s.spawn_fifo(|s| { // task s.1.1
+/// rayon::scope_fifo(|t| {
+/// t.spawn_fifo(|_| ()); // task t.1
+/// t.spawn_fifo(|_| ()); // task t.2
+/// });
+/// });
+/// });
+/// s.spawn_fifo(|s| { // task s.2
+/// });
+/// // point mid
+/// });
+/// // point end
+/// ```
+///
+/// The various tasks that are run will execute roughly like so:
+///
+/// ```notrust
+/// | (start)
+/// |
+/// | (FIFO scope `s` created)
+/// +--------------------+ (task s.1)
+/// +-------+ (task s.2) |
+/// | | +---+ (task s.1.1)
+/// | | | |
+/// | | | | (FIFO scope `t` created)
+/// | | | +----------------+ (task t.1)
+/// | | | +---+ (task t.2) |
+/// | (mid) | | | | |
+/// : | | + <-+------------+ (scope `t` ends)
+/// : | | |
+/// |<------+------------+---+ (scope `s` ends)
+/// |
+/// | (end)
+/// ```
+///
+/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
+/// the thread from which they were spawned, as opposed to `scope()`'s
+/// LIFO. So in this example, we can expect `s.1` to execute before
+/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
+/// FIFO order, as usual. Overall, this has roughly the same order as
+/// the now-deprecated [`breadth_first`] option, except the effect is
+/// isolated to a particular scope. If spawns are intermingled from any
+/// combination of `scope()` and `scope_fifo()`, or from different
+/// threads, their order is only specified with respect to spawns in the
+/// same scope and thread.
+///
+/// For more details on this design, see Rayon [RFC #1].
+///
+/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
+/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `scope_fifo()` or
+/// in any of the spawned jobs, that panic will be propagated and the
+/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
+/// will execute, even if the spawning task should later panic.
+/// `scope_fifo()` returns once all spawned jobs have completed, and any
+/// panics are propagated at that point.
+pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
+ R: Send,
+{
+ in_worker(|owner_thread, _| {
+ let scope = ScopeFifo::<'scope>::new(owner_thread);
+ unsafe { scope.base.complete(owner_thread, || op(&scope)) }
+ })
+}
+
+impl<'scope> Scope<'scope> {
+ fn new(owner_thread: &WorkerThread) -> Self {
+ Scope {
+ base: ScopeBase::new(owner_thread),
+ }
+ }
+
+ /// Spawns a job into the fork-join scope `self`. This job will
+ /// execute sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its
+ /// own reference to the scope `self` as argument. This can be
+ /// used to inject new jobs into `self`.
+ ///
+ /// # Returns
+ ///
+ /// Nothing. The spawned closures cannot pass back values to the
+ /// caller directly, though they can write to local variables on
+ /// the stack (if those variables outlive the scope) or
+ /// communicate through shared channels.
+ ///
+ /// (The intention is to eventualy integrate with Rust futures to
+ /// support spawns of functions that compute a value.)
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// # use rayon_core as rayon;
+ /// let mut value_a = None;
+ /// let mut value_b = None;
+ /// let mut value_c = None;
+ /// rayon::scope(|s| {
+ /// s.spawn(|s1| {
+ /// // ^ this is the same scope as `s`; this handle `s1`
+ /// // is intended for use by the spawned task,
+ /// // since scope handles cannot cross thread boundaries.
+ ///
+ /// value_a = Some(22);
+ ///
+ /// // the scope `s` will not end until all these tasks are done
+ /// s1.spawn(|_| {
+ /// value_b = Some(44);
+ /// });
+ /// });
+ ///
+ /// s.spawn(|_| {
+ /// value_c = Some(66);
+ /// });
+ /// });
+ /// assert_eq!(value_a, Some(22));
+ /// assert_eq!(value_b, Some(44));
+ /// assert_eq!(value_c, Some(66));
+ /// ```
+ ///
+ /// # See also
+ ///
+ /// The [`scope` function] has more extensive documentation about
+ /// task spawning.
+ ///
+ /// [`scope` function]: fn.scope.html
+ pub fn spawn<BODY>(&self, body: BODY)
+ where
+ BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
+ {
+ self.base.increment();
+ unsafe {
+ let job_ref = Box::new(HeapJob::new(move || {
+ self.base.execute_job(move || body(self))
+ }))
+ .as_job_ref();
+
+ // Since `Scope` implements `Sync`, we can't be sure that we're still in a
+ // thread of this pool, so we can't just push to the local worker thread.
+ self.base.registry.inject_or_push(job_ref);
+ }
+ }
+}
+
+impl<'scope> ScopeFifo<'scope> {
+ fn new(owner_thread: &WorkerThread) -> Self {
+ let num_threads = owner_thread.registry().num_threads();
+ ScopeFifo {
+ base: ScopeBase::new(owner_thread),
+ fifos: (0..num_threads).map(|_| JobFifo::new()).collect(),
+ }
+ }
+
+ /// Spawns a job into the fork-join scope `self`. This job will
+ /// execute sometime before the fork-join scope completes. The
+ /// job is specified as a closure, and this closure receives its
+ /// own reference to the scope `self` as argument. This can be
+ /// used to inject new jobs into `self`.
+ ///
+ /// # See also
+ ///
+ /// This method is akin to [`Scope::spawn()`], but with a FIFO
+ /// priority. The [`scope_fifo` function] has more details about
+ /// this distinction.
+ ///
+ /// [`Scope::spawn()`]: struct.Scope.html#method.spawn
+ /// [`scope_fifo` function]: fn.scope_fifo.html
+ pub fn spawn_fifo<BODY>(&self, body: BODY)
+ where
+ BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
+ {
+ self.base.increment();
+ unsafe {
+ let job_ref = Box::new(HeapJob::new(move || {
+ self.base.execute_job(move || body(self))
+ }))
+ .as_job_ref();
+
+ // If we're in the pool, use our scope's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match self.base.registry.current_thread() {
+ Some(worker) => {
+ let fifo = &self.fifos[worker.index()];
+ worker.push(fifo.push(job_ref));
+ }
+ None => self.base.registry.inject(&[job_ref]),
+ }
+ }
+ }
+}
+
+impl<'scope> ScopeBase<'scope> {
+ /// Creates the base of a new scope for the given worker thread
+ fn new(owner_thread: &WorkerThread) -> Self {
+ ScopeBase {
+ owner_thread_index: owner_thread.index(),
+ registry: owner_thread.registry().clone(),
+ panic: AtomicPtr::new(ptr::null_mut()),
+ job_completed_latch: CountLatch::new(),
+ marker: PhantomData,
+ }
+ }
+
+ fn increment(&self) {
+ self.job_completed_latch.increment();
+ }
+
+ /// Executes `func` as a job, either aborting or executing as
+ /// appropriate.
+ ///
+ /// Unsafe because it must be executed on a worker thread.
+ unsafe fn complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R
+ where
+ FUNC: FnOnce() -> R,
+ {
+ let result = self.execute_job_closure(func);
+ self.steal_till_jobs_complete(owner_thread);
+ result.unwrap() // only None if `op` panicked, and that would have been propagated
+ }
+
+ /// Executes `func` as a job, either aborting or executing as
+ /// appropriate.
+ ///
+ /// Unsafe because it must be executed on a worker thread.
+ unsafe fn execute_job<FUNC>(&self, func: FUNC)
+ where
+ FUNC: FnOnce(),
+ {
+ let _: Option<()> = self.execute_job_closure(func);
+ }
+
+ /// Executes `func` as a job in scope. Adjusts the "job completed"
+ /// counters and also catches any panic and stores it into
+ /// `scope`.
+ ///
+ /// Unsafe because this must be executed on a worker thread.
+ unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ where
+ FUNC: FnOnce() -> R,
+ {
+ match unwind::halt_unwinding(func) {
+ Ok(r) => {
+ self.job_completed_ok();
+ Some(r)
+ }
+ Err(err) => {
+ self.job_panicked(err);
+ None
+ }
+ }
+ }
+
+ unsafe fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
+ // capture the first error we see, free the rest
+ let nil = ptr::null_mut();
+ let mut err = Box::new(err); // box up the fat ptr
+ if self
+ .panic
+ .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ mem::forget(err); // ownership now transferred into self.panic
+ }
+
+ self.job_completed_latch
+ .set_and_tickle_one(&self.registry, self.owner_thread_index);
+ }
+
+ unsafe fn job_completed_ok(&self) {
+ self.job_completed_latch
+ .set_and_tickle_one(&self.registry, self.owner_thread_index);
+ }
+
+ unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) {
+ // wait for job counter to reach 0:
+ owner_thread.wait_until(&self.job_completed_latch);
+
+ // propagate panic, if any occurred; at this point, all
+ // outstanding jobs have completed, so we can use a relaxed
+ // ordering:
+ let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
+ if !panic.is_null() {
+ let value: Box<Box<dyn Any + Send + 'static>> = mem::transmute(panic);
+ unwind::resume_unwinding(*value);
+ }
+ }
+}
+
+impl<'scope> fmt::Debug for Scope<'scope> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Scope")
+ .field("pool_id", &self.base.registry.id())
+ .field("owner_thread_index", &self.base.owner_thread_index)
+ .field("panic", &self.base.panic)
+ .field("job_completed_latch", &self.base.job_completed_latch)
+ .finish()
+ }
+}
+
+impl<'scope> fmt::Debug for ScopeFifo<'scope> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ScopeFifo")
+ .field("num_fifos", &self.fifos.len())
+ .field("pool_id", &self.base.registry.id())
+ .field("owner_thread_index", &self.base.owner_thread_index)
+ .field("panic", &self.base.panic)
+ .field("job_completed_latch", &self.base.job_completed_latch)
+ .finish()
+ }
+}
diff --git a/src/scope/test.rs b/src/scope/test.rs
new file mode 100644
index 0000000..8cb82b6
--- /dev/null
+++ b/src/scope/test.rs
@@ -0,0 +1,515 @@
+use crate::unwind;
+use crate::ThreadPoolBuilder;
+use crate::{scope, scope_fifo, Scope, ScopeFifo};
+use rand::{Rng, SeedableRng};
+use rand_xorshift::XorShiftRng;
+use std::cmp;
+use std::iter::once;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Mutex;
+use std::vec;
+
+#[test]
+fn scope_empty() {
+ scope(|_| {});
+}
+
+#[test]
+fn scope_result() {
+ let x = scope(|_| 22);
+ assert_eq!(x, 22);
+}
+
+#[test]
+fn scope_two() {
+ let counter = &AtomicUsize::new(0);
+ scope(|s| {
+ s.spawn(move |_| {
+ counter.fetch_add(1, Ordering::SeqCst);
+ });
+ s.spawn(move |_| {
+ counter.fetch_add(10, Ordering::SeqCst);
+ });
+ });
+
+ let v = counter.load(Ordering::SeqCst);
+ assert_eq!(v, 11);
+}
+
+#[test]
+fn scope_divide_and_conquer() {
+ let counter_p = &AtomicUsize::new(0);
+ scope(|s| s.spawn(move |s| divide_and_conquer(s, counter_p, 1024)));
+
+ let counter_s = &AtomicUsize::new(0);
+ divide_and_conquer_seq(&counter_s, 1024);
+
+ let p = counter_p.load(Ordering::SeqCst);
+ let s = counter_s.load(Ordering::SeqCst);
+ assert_eq!(p, s);
+}
+
+fn divide_and_conquer<'scope>(scope: &Scope<'scope>, counter: &'scope AtomicUsize, size: usize) {
+ if size > 1 {
+ scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
+ scope.spawn(move |scope| divide_and_conquer(scope, counter, size / 2));
+ } else {
+ // count the leaves
+ counter.fetch_add(1, Ordering::SeqCst);
+ }
+}
+
+fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) {
+ if size > 1 {
+ divide_and_conquer_seq(counter, size / 2);
+ divide_and_conquer_seq(counter, size / 2);
+ } else {
+ // count the leaves
+ counter.fetch_add(1, Ordering::SeqCst);
+ }
+}
+
+struct Tree<T: Send> {
+ value: T,
+ children: Vec<Tree<T>>,
+}
+
+impl<T: Send> Tree<T> {
+ fn iter<'s>(&'s self) -> vec::IntoIter<&'s T> {
+ once(&self.value)
+ .chain(self.children.iter().flat_map(Tree::iter))
+ .collect::<Vec<_>>() // seems like it shouldn't be needed... but prevents overflow
+ .into_iter()
+ }
+
+ fn update<OP>(&mut self, op: OP)
+ where
+ OP: Fn(&mut T) + Sync,
+ T: Send,
+ {
+ scope(|s| self.update_in_scope(&op, s));
+ }
+
+ fn update_in_scope<'scope, OP>(&'scope mut self, op: &'scope OP, scope: &Scope<'scope>)
+ where
+ OP: Fn(&mut T) + Sync,
+ {
+ let Tree {
+ ref mut value,
+ ref mut children,
+ } = *self;
+ scope.spawn(move |scope| {
+ for child in children {
+ scope.spawn(move |scope| child.update_in_scope(op, scope));
+ }
+ });
+
+ op(value);
+ }
+}
+
+fn random_tree(depth: usize) -> Tree<u32> {
+ assert!(depth > 0);
+ let mut seed = <XorShiftRng as SeedableRng>::Seed::default();
+ (0..).zip(seed.as_mut()).for_each(|(i, x)| *x = i);
+ let mut rng = XorShiftRng::from_seed(seed);
+ random_tree1(depth, &mut rng)
+}
+
+fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> {
+ let children = if depth == 0 {
+ vec![]
+ } else {
+ (0..rng.gen_range(0, 4)) // somewhere between 0 and 3 children at each level
+ .map(|_| random_tree1(depth - 1, rng))
+ .collect()
+ };
+
+ Tree {
+ value: rng.gen_range(0, 1_000_000),
+ children,
+ }
+}
+
+#[test]
+fn update_tree() {
+ let mut tree: Tree<u32> = random_tree(10);
+ let values: Vec<u32> = tree.iter().cloned().collect();
+ tree.update(|v| *v += 1);
+ let new_values: Vec<u32> = tree.iter().cloned().collect();
+ assert_eq!(values.len(), new_values.len());
+ for (&i, &j) in values.iter().zip(&new_values) {
+ assert_eq!(i + 1, j);
+ }
+}
+
+/// Check that if you have a chain of scoped tasks where T0 spawns T1
+/// spawns T2 and so forth down to Tn, the stack space should not grow
+/// linearly with N. We test this by some unsafe hackery and
+/// permitting an approx 10% change with a 10x input change.
+#[test]
+fn linear_stack_growth() {
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let mut max_diff = Mutex::new(0);
+ let bottom_of_stack = 0;
+ scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5));
+ let diff_when_5 = *max_diff.get_mut().unwrap() as f64;
+
+ scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500));
+ let diff_when_500 = *max_diff.get_mut().unwrap() as f64;
+
+ let ratio = diff_when_5 / diff_when_500;
+ assert!(
+ ratio > 0.9 && ratio < 1.1,
+ "stack usage ratio out of bounds: {}",
+ ratio
+ );
+ });
+}
+
+fn the_final_countdown<'scope>(
+ s: &Scope<'scope>,
+ bottom_of_stack: &'scope i32,
+ max: &'scope Mutex<usize>,
+ n: usize,
+) {
+ let top_of_stack = 0;
+ let p = bottom_of_stack as *const i32 as usize;
+ let q = &top_of_stack as *const i32 as usize;
+ let diff = if p > q { p - q } else { q - p };
+
+ let mut data = max.lock().unwrap();
+ *data = cmp::max(diff, *data);
+
+ if n > 0 {
+ s.spawn(move |s| the_final_countdown(s, bottom_of_stack, max, n - 1));
+ }
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_scope() {
+ scope(|_| panic!("Hello, world!"));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_spawn() {
+ scope(|s| s.spawn(|_| panic!("Hello, world!")));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_nested_spawn() {
+ scope(|s| s.spawn(|s| s.spawn(|s| s.spawn(|_| panic!("Hello, world!")))));
+}
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate_nested_scope_spawn() {
+ scope(|s| s.spawn(|_| scope(|s| s.spawn(|_| panic!("Hello, world!")))));
+}
+
+#[test]
+fn panic_propagate_still_execute_1() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| panic!("Hello, world!")); // job A
+ s.spawn(|_| x = true); // job B, should still execute even though A panics
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "job b failed to execute"),
+ }
+}
+
+#[test]
+fn panic_propagate_still_execute_2() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| x = true); // job B, should still execute even though A panics
+ s.spawn(|_| panic!("Hello, world!")); // job A
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "job b failed to execute"),
+ }
+}
+
+#[test]
+fn panic_propagate_still_execute_3() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| x = true); // spanwed job should still execute despite later panic
+ panic!("Hello, world!");
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "panic after spawn, spawn failed to execute"),
+ }
+}
+
+#[test]
+fn panic_propagate_still_execute_4() {
+ let mut x = false;
+ match unwind::halt_unwinding(|| {
+ scope(|s| {
+ s.spawn(|_| panic!("Hello, world!"));
+ x = true;
+ });
+ }) {
+ Ok(_) => panic!("failed to propagate panic"),
+ Err(_) => assert!(x, "panic in spawn tainted scope"),
+ }
+}
+
+macro_rules! test_order {
+ ($scope:ident => $spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$spawn(move |scope| {
+ for j in 0..10 {
+ scope.$spawn(move |_| {
+ vec.lock().unwrap().push(i * 10 + j);
+ });
+ }
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+fn lifo_order() {
+ // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ let vec = test_order!(scope => spawn);
+ let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn fifo_order() {
+ // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ let vec = test_order!(scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+macro_rules! test_nested_order {
+ ($outer_scope:ident => $outer_spawn:ident,
+ $inner_scope:ident => $inner_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $outer_scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$outer_spawn(move |_| {
+ $inner_scope(|scope| {
+ for j in 0..10 {
+ scope.$inner_spawn(move |_| {
+ vec.lock().unwrap().push(i * 10 + j);
+ });
+ }
+ });
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+fn nested_lifo_order() {
+ // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
+ let vec = test_nested_order!(scope => spawn, scope => spawn);
+ let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn nested_fifo_order() {
+ // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
+ let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn nested_lifo_fifo_order() {
+ // LIFO on the outside, FIFO on the inside
+ let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..10)
+ .rev()
+ .flat_map(|i| (0..10).map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn nested_fifo_lifo_order() {
+ // FIFO on the outside, LIFO on the inside
+ let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
+ let expected: Vec<i32> = (0..10)
+ .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+macro_rules! spawn_push {
+ ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{
+ $scope.$spawn(move |_| $vec.lock().unwrap().push($i));
+ }};
+}
+
+/// Test spawns pushing a series of numbers, interleaved
+/// such that negative values are using an inner scope.
+macro_rules! test_mixed_order {
+ ($outer_scope:ident => $outer_spawn:ident,
+ $inner_scope:ident => $inner_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ $outer_scope(|outer_scope| {
+ let vec = &vec;
+ spawn_push!(outer_scope.$outer_spawn, vec, 0);
+ $inner_scope(|inner_scope| {
+ spawn_push!(inner_scope.$inner_spawn, vec, -1);
+ spawn_push!(outer_scope.$outer_spawn, vec, 1);
+ spawn_push!(inner_scope.$inner_spawn, vec, -2);
+ spawn_push!(outer_scope.$outer_spawn, vec, 2);
+ spawn_push!(inner_scope.$inner_spawn, vec, -3);
+ });
+ spawn_push!(outer_scope.$outer_spawn, vec, 3);
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+fn mixed_lifo_order() {
+ // NB: the end of the inner scope makes us execute some of the outer scope
+ // before they've all been spawned, so they're not perfectly LIFO.
+ let vec = test_mixed_order!(scope => spawn, scope => spawn);
+ let expected = vec![-3, 2, -2, 1, -1, 3, 0];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn mixed_fifo_order() {
+ let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
+ let expected = vec![-1, 0, -2, 1, -3, 2, 3];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn mixed_lifo_fifo_order() {
+ // NB: the end of the inner scope makes us execute some of the outer scope
+ // before they've all been spawned, so they're not perfectly LIFO.
+ let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo);
+ let expected = vec![-1, 2, -2, 1, -3, 3, 0];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn mixed_fifo_lifo_order() {
+ let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
+ let expected = vec![-3, 0, -2, 1, -1, 2, 3];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn static_scope() {
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
+
+ let mut range = 0..100;
+ let sum = range.clone().sum();
+ let iter = &mut range;
+
+ COUNTER.store(0, Ordering::Relaxed);
+ scope(|s: &Scope<'static>| {
+ // While we're allowed the locally borrowed iterator,
+ // the spawns must be static.
+ for i in iter {
+ s.spawn(move |_| {
+ COUNTER.fetch_add(i, Ordering::Relaxed);
+ });
+ }
+ });
+
+ assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
+}
+
+#[test]
+fn static_scope_fifo() {
+ static COUNTER: AtomicUsize = AtomicUsize::new(0);
+
+ let mut range = 0..100;
+ let sum = range.clone().sum();
+ let iter = &mut range;
+
+ COUNTER.store(0, Ordering::Relaxed);
+ scope_fifo(|s: &ScopeFifo<'static>| {
+ // While we're allowed the locally borrowed iterator,
+ // the spawns must be static.
+ for i in iter {
+ s.spawn_fifo(move |_| {
+ COUNTER.fetch_add(i, Ordering::Relaxed);
+ });
+ }
+ });
+
+ assert_eq!(COUNTER.load(Ordering::Relaxed), sum);
+}
+
+#[test]
+fn mixed_lifetime_scope() {
+ fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
+ scope(move |s: &Scope<'counter>| {
+ // We can borrow 'slice here, but the spawns can only borrow 'counter.
+ for &c in counters {
+ s.spawn(move |_| {
+ c.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ }
+
+ let counter = AtomicUsize::new(0);
+ increment(&[&counter; 100]);
+ assert_eq!(counter.into_inner(), 100);
+}
+
+#[test]
+fn mixed_lifetime_scope_fifo() {
+ fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) {
+ scope_fifo(move |s: &ScopeFifo<'counter>| {
+ // We can borrow 'slice here, but the spawns can only borrow 'counter.
+ for &c in counters {
+ s.spawn_fifo(move |_| {
+ c.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ }
+
+ let counter = AtomicUsize::new(0);
+ increment(&[&counter; 100]);
+ assert_eq!(counter.into_inner(), 100);
+}
diff --git a/src/sleep/README.md b/src/sleep/README.md
new file mode 100644
index 0000000..c62c397
--- /dev/null
+++ b/src/sleep/README.md
@@ -0,0 +1,219 @@
+# Introduction: the sleep module
+
+The code in this module governs when worker threads should go to
+sleep. The system used in this code was introduced in [Rayon RFC #5].
+There is also a [video walkthrough] available. Both of those may be
+valuable resources to understanding the code, though naturally they
+will also grow stale over time. The comments in this file are
+extracted from the RFC and meant to be kept up to date.
+
+[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5
+[video walkthrough]: https://youtu.be/HvmQsE5M4cY
+
+# The `Sleep` struct
+
+The `Sleep` struct is embedded into each registry. It performs several functions:
+
+* It tracks when workers are awake or asleep.
+* It decides how long a worker should look for work before it goes to sleep,
+ via a callback that is invoked periodically from the worker's search loop.
+* It is notified when latches are set, jobs are published, or other
+ events occur, and it will go and wake the appropriate threads if
+ they are sleeping.
+
+# Thread states
+
+There are three main thread states:
+
+* An **active** thread is one that is actively executing a job.
+* An **idle** thread is one that is searching for work to do. It will be
+ trying to steal work or pop work from the global injector queue.
+* A **sleeping** thread is one that is blocked on a condition variable,
+ waiting to be awoken.
+
+We sometimes refer to the final two states collectively as **inactive**.
+Threads begin as idle but transition to idle and finally sleeping when
+they're unable to find work to do.
+
+## Sleepy threads
+
+There is one other special state worth mentioning. During the idle state,
+threads can get **sleepy**. A sleepy thread is still idle, in that it is still
+searching for work, but it is *about* to go to sleep after it does one more
+search (or some other number, potentially). When a thread enters the sleepy
+state, it signals (via the **jobs event counter**, described below) that it is
+about to go to sleep. If new work is published, this will lead to the counter
+being adjusted. When the thread actually goes to sleep, it will (hopefully, but
+not guaranteed) see that the counter has changed and elect not to sleep, but
+instead to search again. See the section on the **jobs event counter** for more
+details.
+
+# The counters
+
+One of the key structs in the sleep module is `AtomicCounters`, found in
+`counters.rs`. It packs three counters into one atomically managed value:
+
+* Two **thread counters**, which track the number of threads in a particular state.
+* The **jobs event counter**, which is used to signal when new work is available.
+ It (sort of) tracks the number of jobs posted, but not quite, and it can rollover.
+
+## Thread counters
+
+There are two thread counters, one that tracks **inactive** threads and one that
+tracks **sleeping** threads. From this, one can deduce the number of threads
+that are idle by subtracting sleeping threads from inactive threads. We track
+the counters in this way because it permits simpler atomic operations. One can
+increment the number of sleeping threads (and thus decrease the number of idle
+threads) simply by doing one atomic increment, for example. Similarly, one can
+decrease the number of sleeping threads (and increase the number of idle
+threads) through one atomic decrement.
+
+These counters are adjusted as follows:
+
+* When a thread enters the idle state: increment the inactive thread counter.
+* When a thread enters the sleeping state: increment the sleeping thread counter.
+* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
+ * Subtle point: the thread that *awakens* the sleeping thread decrements the
+ counter, not the thread that is *sleeping*. This is because there is a delay
+ between siganling a thread to wake and the thread actually waking:
+ decrementing the counter when awakening the thread means that other threads
+ that may be posting work will see the up-to-date value that much faster.
+* When a thread finds work, exiting the idle state: decrement the inactive
+ thread counter.
+
+## Jobs event counter
+
+The final counter is the **jobs event counter**. The role of this counter is to
+help sleepy threads detect when new work is posted in a lightweight fashion. In
+its simplest form, we would simply have a counter that gets incremented each
+time a new job is posted. This way, when a thread gets sleepy, it could read the
+counter, and then compare to see if the value has changed before it actually
+goes to sleep. But this [turns out to be too expensive] in practice, so we use a
+somewhat more complex scheme.
+
+[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747
+
+The idea is that the counter toggles between two states, depending on whether
+its value is even or odd (or, equivalently, on the value of its low bit):
+
+* Even -- If the low bit is zero, then it means that there has been no new work
+ since the last thread got sleepy.
+* Odd -- If the low bit is one, then it means that new work was posted since
+ the last thread got sleepy.
+
+### New work is posted
+
+When new work is posted, we check the value of the counter: if it is even,
+then we increment it by one, so that it becomes odd.
+
+### Worker thread gets sleepy
+
+When a worker thread gets sleepy, it will read the value of the counter. If the
+counter is odd, it will increment the counter so that it is even. Either way, it
+remembers the final value of the counter. The final value will be used later,
+when the thread is going to sleep. If at that time the counter has not changed,
+then we can assume no new jobs have been posted (though note the remote
+possibility of rollover, discussed in detail below).
+
+# Protocol for a worker thread to post work
+
+The full protocol for a thread to post work is as follows
+
+* If the work is posted into the injection queue, then execute a seq-cst fence (see below).
+* Load the counters, incrementing the JEC if it is even so that it is odd.
+* Check if there are idle threads available to handle this new job. If not,
+ and there are sleeping threads, then wake one or more threads.
+
+# Protocol for a worker thread to fall asleep
+
+The full protocol for a thread to fall asleep is as follows:
+
+* After completing all its jobs, the worker goes idle and begins to
+ search for work. As it searches, it counts "rounds". In each round,
+ it searches all other work threads' queues, plus the 'injector queue' for
+ work injected from the outside. If work is found in this search, the thread
+ becomes active again and hence restarts this protocol from the top.
+* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy`
+ above, remembering the `final_value` of the JEC. It does one more search for work.
+* If no work is found, the thread atomically:
+ * Checks the JEC to see that it has not changed from `final_value`.
+ * If it has, then the thread goes back to searchin for work. We reset to
+ just before we got sleepy, so that we will do one more search
+ before attending to sleep again (rather than searching for many rounds).
+ * Increments the number of sleeping threads by 1.
+* The thread then executes a seq-cst fence operation (see below).
+* The thread then does one final check for injected jobs (see below). If any
+ are available, it returns to the 'pre-sleepy' state as if the JEC had changed.
+* The thread waits to be signaled. Once signaled, it returns to the idle state.
+
+# The jobs event counter and deadlock
+
+As described in the section on the JEC, the main concern around going to sleep
+is avoiding a race condition wherein:
+
+* Thread A looks for work, finds none.
+* Thread B posts work but sees no sleeping threads.
+* Thread A goes to sleep.
+
+The JEC protocol largely prevents this, but due to rollover, this prevention is
+not complete. It is possible -- if unlikely -- that enough activity occurs for
+Thread A to observe the same JEC value that it saw when getting sleepy. If the
+new work being published came from *inside* the thread-pool, then this race
+condition isn't too harmful. It means that we have fewer workers processing the
+work then we should, but we won't deadlock. This seems like an acceptable risk
+given that this is unlikely in practice.
+
+However, if the work was posted as an *external* job, that is a problem. In that
+case, it's possible that all of our workers could go to sleep, and the external
+job would never get processed. To prevent that, the sleeping protocol includes
+one final check to see if the injector queue is empty before fully falling
+asleep. Note that this final check occurs **after** the number of sleeping
+threads has been incremented. We are not concerned therefore with races against
+injections that occur after that increment, only before.
+
+Unfortunately, there is one rather subtle point concerning this final check:
+we wish to avoid the possibility that:
+
+* work is pushed into the injection queue by an outside thread X,
+* the sleepy thread S sees the JEC but it has rolled over and is equal
+* the sleepy thread S reads the injection queue but does not see the work posted by X.
+
+This is possible because the C++ memory model typically offers guarantees of the
+form "if you see the access A, then you must see those other accesses" -- but it
+doesn't guarantee that you will see the access A (i.e., if you think of
+processors with independent caches, you may be operating on very out of date
+cache state).
+
+## Using seq-cst fences to prevent deadlock
+
+To overcome this problem, we have inserted two sequentially consistent fence
+operations into the protocols above:
+
+* One fence occurs after work is posted into the injection queue, but before the
+ counters are read (including the number of sleeping threads).
+ * Note that no fence is needed for work posted to internal queues, since it is ok
+ to overlook work in that case.
+* One fence occurs after the number of sleeping threads is incremented, but
+ before the injection queue is read.
+
+### Proof sketch
+
+What follows is a "proof sketch" that the protocol is deadlock free. We model
+two relevant bits of memory, the job injector queue J and the atomic counters C.
+
+Consider the actions of the injecting thread:
+
+* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics.
+* PushFence: A sequentially consistent fence is executed.
+* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first).
+
+Meanwhile, the sleepy thread does the following:
+
+* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C.
+* SleepFence: A sequentially consistent fence is executed.
+* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics.
+
+Either PushFence or SleepFence must come first:
+
+* If PushFence comes first, then PushJob must be visible to ReadJob.
+* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file
diff --git a/src/sleep/counters.rs b/src/sleep/counters.rs
new file mode 100644
index 0000000..879d32e
--- /dev/null
+++ b/src/sleep/counters.rs
@@ -0,0 +1,271 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub(super) struct AtomicCounters {
+ /// Packs together a number of counters. The counters are ordered as
+ /// follows, from least to most significant bits (here, we assuming
+ /// that [`THREADS_BITS`] is equal to 10):
+ ///
+ /// * Bits 0..10: Stores the number of **sleeping threads**
+ /// * Bits 10..20: Stores the number of **inactive threads**
+ /// * Bits 20..: Stores the **job event counter** (JEC)
+ ///
+ /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
+ /// that the total number of bits (and hence the number of bits used for the
+ /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
+ value: AtomicUsize,
+}
+
+#[derive(Copy, Clone)]
+pub(super) struct Counters {
+ word: usize,
+}
+
+/// A value read from the **Jobs Event Counter**.
+/// See the [`README.md`](README.md) for more
+/// coverage of how the jobs event counter works.
+#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
+pub(super) struct JobsEventCounter(usize);
+
+impl JobsEventCounter {
+ pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
+
+ #[inline]
+ pub(super) fn as_usize(self) -> usize {
+ self.0
+ }
+
+ /// The JEC "is sleepy" if the last thread to increment it was in the
+ /// process of becoming sleepy. This is indicated by its value being *even*.
+ /// When new jobs are posted, they check if the JEC is sleepy, and if so
+ /// they incremented it.
+ #[inline]
+ pub(super) fn is_sleepy(self) -> bool {
+ (self.as_usize() & 1) == 0
+ }
+
+ /// The JEC "is active" if the last thread to increment it was posting new
+ /// work. This is indicated by its value being *odd*. When threads get
+ /// sleepy, they will check if the JEC is active, and increment it.
+ #[inline]
+ pub(super) fn is_active(self) -> bool {
+ !self.is_sleepy()
+ }
+}
+
+/// Number of bits used for the thread counters.
+const THREADS_BITS: usize = 10;
+
+/// Bits to shift to select the sleeping threads
+/// (used with `select_bits`).
+const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
+
+/// Bits to shift to select the inactive threads
+/// (used with `select_bits`).
+const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
+
+/// Bits to shift to select the JEC
+/// (use JOBS_BITS).
+const JEC_SHIFT: usize = 2 * THREADS_BITS;
+
+/// Max value for the thread counters.
+const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
+
+/// Constant that can be added to add one sleeping thread.
+const ONE_SLEEPING: usize = 1;
+
+/// Constant that can be added to add one inactive thread.
+/// An inactive thread is either idle, sleepy, or sleeping.
+const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
+
+/// Constant that can be added to add one to the JEC.
+const ONE_JEC: usize = 1 << JEC_SHIFT;
+
+impl AtomicCounters {
+ #[inline]
+ pub(super) fn new() -> AtomicCounters {
+ AtomicCounters {
+ value: AtomicUsize::new(0),
+ }
+ }
+
+ /// Load and return the current value of the various counters.
+ /// This value can then be given to other method which will
+ /// attempt to update the counters via compare-and-swap.
+ #[inline]
+ pub(super) fn load(&self, ordering: Ordering) -> Counters {
+ Counters::new(self.value.load(ordering))
+ }
+
+ #[inline]
+ fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
+ self.value
+ .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Adds an inactive thread. This cannot fail.
+ ///
+ /// This should be invoked when a thread enters its idle loop looking
+ /// for work. It is decremented when work is found. Note that it is
+ /// not decremented if the thread transitions from idle to sleepy or sleeping;
+ /// so the number of inactive threads is always greater-than-or-equal
+ /// to the number of sleeping threads.
+ #[inline]
+ pub(super) fn add_inactive_thread(&self) {
+ self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
+ }
+
+ /// Increments the jobs event counter if `increment_when`, when applied to
+ /// the current value, is true. Used to toggle the JEC from even (sleepy) to
+ /// odd (active) or vice versa. Returns the final value of the counters, for
+ /// which `increment_when` is guaranteed to return false.
+ pub(super) fn increment_jobs_event_counter_if(
+ &self,
+ increment_when: impl Fn(JobsEventCounter) -> bool,
+ ) -> Counters {
+ loop {
+ let old_value = self.load(Ordering::SeqCst);
+ if increment_when(old_value.jobs_counter()) {
+ let new_value = old_value.increment_jobs_counter();
+ if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
+ return new_value;
+ }
+ } else {
+ return old_value;
+ }
+ }
+ }
+
+ /// Subtracts an inactive thread. This cannot fail. It is invoked
+ /// when a thread finds work and hence becomes active. It returns the
+ /// number of sleeping threads to wake up (if any).
+ ///
+ /// See `add_inactive_thread`.
+ #[inline]
+ pub(super) fn sub_inactive_thread(&self) -> usize {
+ let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
+ debug_assert!(
+ old_value.inactive_threads() > 0,
+ "sub_inactive_thread: old_value {:?} has no inactive threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() <= old_value.inactive_threads(),
+ "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
+ old_value,
+ old_value.sleeping_threads(),
+ old_value.inactive_threads(),
+ );
+
+ // Current heuristic: whenever an inactive thread goes away, if
+ // there are any sleeping threads, wake 'em up.
+ let sleeping_threads = old_value.sleeping_threads();
+ std::cmp::min(sleeping_threads, 2)
+ }
+
+ /// Subtracts a sleeping thread. This cannot fail, but it is only
+ /// safe to do if you you know the number of sleeping threads is
+ /// non-zero (i.e., because you have just awoken a sleeping
+ /// thread).
+ #[inline]
+ pub(super) fn sub_sleeping_thread(&self) {
+ let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
+ debug_assert!(
+ old_value.sleeping_threads() > 0,
+ "sub_sleeping_thread: old_value {:?} had no sleeping threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() <= old_value.inactive_threads(),
+ "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
+ old_value,
+ old_value.sleeping_threads(),
+ old_value.inactive_threads(),
+ );
+ }
+
+ #[inline]
+ pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
+ debug_assert!(
+ old_value.inactive_threads() > 0,
+ "try_add_sleeping_thread: old_value {:?} has no inactive threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() < THREADS_MAX,
+ "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
+ old_value,
+ );
+
+ let mut new_value = old_value;
+ new_value.word += ONE_SLEEPING;
+
+ self.try_exchange(old_value, new_value, Ordering::SeqCst)
+ }
+}
+
+#[inline]
+fn select_thread(word: usize, shift: usize) -> usize {
+ ((word >> shift) as usize) & THREADS_MAX
+}
+
+#[inline]
+fn select_jec(word: usize) -> usize {
+ (word >> JEC_SHIFT) as usize
+}
+
+impl Counters {
+ #[inline]
+ fn new(word: usize) -> Counters {
+ Counters { word }
+ }
+
+ #[inline]
+ fn increment_jobs_counter(self) -> Counters {
+ // We can freely add to JEC because it occupies the most significant bits.
+ // Thus it doesn't overflow into the other counters, just wraps itself.
+ Counters {
+ word: self.word.wrapping_add(ONE_JEC),
+ }
+ }
+
+ #[inline]
+ pub(super) fn jobs_counter(self) -> JobsEventCounter {
+ JobsEventCounter(select_jec(self.word))
+ }
+
+ /// The number of threads that are not actively
+ /// executing work. They may be idle, sleepy, or asleep.
+ #[inline]
+ pub(super) fn inactive_threads(self) -> usize {
+ select_thread(self.word, INACTIVE_SHIFT)
+ }
+
+ #[inline]
+ pub(super) fn awake_but_idle_threads(self) -> usize {
+ debug_assert!(
+ self.sleeping_threads() <= self.inactive_threads(),
+ "sleeping threads: {} > raw idle threads {}",
+ self.sleeping_threads(),
+ self.inactive_threads()
+ );
+ self.inactive_threads() - self.sleeping_threads()
+ }
+
+ #[inline]
+ pub(super) fn sleeping_threads(self) -> usize {
+ select_thread(self.word, SLEEPING_SHIFT)
+ }
+}
+
+impl std::fmt::Debug for Counters {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let word = format!("{:016x}", self.word);
+ fmt.debug_struct("Counters")
+ .field("word", &word)
+ .field("jobs", &self.jobs_counter().0)
+ .field("inactive", &self.inactive_threads())
+ .field("sleeping", &self.sleeping_threads())
+ .finish()
+ }
+}
diff --git a/src/sleep/mod.rs b/src/sleep/mod.rs
new file mode 100644
index 0000000..2aa262c
--- /dev/null
+++ b/src/sleep/mod.rs
@@ -0,0 +1,392 @@
+//! Code that decides when workers should go to sleep. See README.md
+//! for an overview.
+
+use crate::latch::CoreLatch;
+use crate::log::Event::*;
+use crate::log::Logger;
+use crossbeam_utils::CachePadded;
+use std::sync::atomic::Ordering;
+use std::sync::{Condvar, Mutex};
+use std::thread;
+use std::usize;
+
+mod counters;
+use self::counters::{AtomicCounters, JobsEventCounter};
+
+/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
+/// of workers. It has callbacks that are invoked periodically at significant events,
+/// such as when workers are looping and looking for work, when latches are set, or when
+/// jobs are published, and it either blocks threads or wakes them in response to these
+/// events. See the [`README.md`] in this module for more details.
+///
+/// [`README.md`] README.md
+pub(super) struct Sleep {
+ logger: Logger,
+
+ /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
+ /// them block.
+ worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
+
+ counters: AtomicCounters,
+}
+
+/// An instance of this struct is created when a thread becomes idle.
+/// It is consumed when the thread finds work, and passed by `&mut`
+/// reference for operations that preserve the idle state. (In other
+/// words, producing one of these structs is evidence the thread is
+/// idle.) It tracks state such as how long the thread has been idle.
+pub(super) struct IdleState {
+ /// What is worker index of the idle thread?
+ worker_index: usize,
+
+ /// How many rounds have we been circling without sleeping?
+ rounds: u32,
+
+ /// Once we become sleepy, what was the sleepy counter value?
+ /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
+ jobs_counter: JobsEventCounter,
+}
+
+/// The "sleep state" for an individual worker.
+#[derive(Default)]
+struct WorkerSleepState {
+ /// Set to true when the worker goes to sleep; set to false when
+ /// the worker is notified or when it wakes.
+ is_blocked: Mutex<bool>,
+
+ condvar: Condvar,
+}
+
+const ROUNDS_UNTIL_SLEEPY: u32 = 32;
+const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
+
+impl Sleep {
+ pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
+ Sleep {
+ logger,
+ worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
+ counters: AtomicCounters::new(),
+ }
+ }
+
+ #[inline]
+ pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
+ self.logger.log(|| ThreadIdle {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ self.counters.add_inactive_thread();
+
+ IdleState {
+ worker_index,
+ rounds: 0,
+ jobs_counter: JobsEventCounter::DUMMY,
+ }
+ }
+
+ #[inline]
+ pub(super) fn work_found(&self, idle_state: IdleState) {
+ self.logger.log(|| ThreadFoundWork {
+ worker: idle_state.worker_index,
+ yields: idle_state.rounds,
+ });
+
+ // If we were the last idle thread and other threads are still sleeping,
+ // then we should wake up another thread.
+ let threads_to_wake = self.counters.sub_inactive_thread();
+ self.wake_any_threads(threads_to_wake as u32);
+ }
+
+ #[inline]
+ pub(super) fn no_work_found(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
+ thread::yield_now();
+ idle_state.rounds += 1;
+ } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
+ idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else {
+ debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
+ self.sleep(idle_state, latch, has_injected_jobs);
+ }
+ }
+
+ #[cold]
+ fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
+ let counters = self
+ .counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_active);
+ let jobs_counter = counters.jobs_counter();
+ self.logger.log(|| ThreadSleepy {
+ worker: worker_index,
+ jobs_counter: jobs_counter.as_usize(),
+ });
+ jobs_counter
+ }
+
+ #[cold]
+ fn sleep(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ let worker_index = idle_state.worker_index;
+
+ if !latch.get_sleepy() {
+ self.logger.log(|| ThreadSleepInterruptedByLatch {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ return;
+ }
+
+ let sleep_state = &self.worker_sleep_states[worker_index];
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ debug_assert!(!*is_blocked);
+
+ // Our latch was signalled. We should wake back up fully as we
+ // wil have some stuff to do.
+ if !latch.fall_asleep() {
+ self.logger.log(|| ThreadSleepInterruptedByLatch {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ idle_state.wake_fully();
+ return;
+ }
+
+ loop {
+ let counters = self.counters.load(Ordering::SeqCst);
+
+ // Check if the JEC has changed since we got sleepy.
+ debug_assert!(idle_state.jobs_counter.is_sleepy());
+ if counters.jobs_counter() != idle_state.jobs_counter {
+ // JEC has changed, so a new job was posted, but for some reason
+ // we didn't see it. We should return to just before the SLEEPY
+ // state so we can do another search and (if we fail to find
+ // work) go back to sleep.
+ self.logger.log(|| ThreadSleepInterruptedByJob {
+ worker: worker_index,
+ });
+
+ idle_state.wake_partly();
+ latch.wake_up();
+ return;
+ }
+
+ // Otherwise, let's move from IDLE to SLEEPING.
+ if self.counters.try_add_sleeping_thread(counters) {
+ break;
+ }
+ }
+
+ // Successfully registered as asleep.
+
+ self.logger.log(|| ThreadSleeping {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ // We have one last check for injected jobs to do. This protects against
+ // deadlock in the very unlikely event that
+ //
+ // - an external job is being injected while we are sleepy
+ // - that job triggers the rollover over the JEC such that we don't see it
+ // - we are the last active worker thread
+ std::sync::atomic::fence(Ordering::SeqCst);
+ if has_injected_jobs() {
+ // If we see an externally injected job, then we have to 'wake
+ // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
+ // the one that wakes us.)
+ self.counters.sub_sleeping_thread();
+ } else {
+ // If we don't see an injected job (the normal case), then flag
+ // ourselves as asleep and wait till we are notified.
+ //
+ // (Note that `is_blocked` is held under a mutex and the mutex was
+ // acquired *before* we incremented the "sleepy counter". This means
+ // that whomever is coming to wake us will have to wait until we
+ // release the mutex in the call to `wait`, so they will see this
+ // boolean as true.)
+ *is_blocked = true;
+ while *is_blocked {
+ is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
+ }
+ }
+
+ // Update other state:
+ idle_state.wake_fully();
+ latch.wake_up();
+
+ self.logger.log(|| ThreadAwoken {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+ }
+
+ /// Notify the given thread that it should wake up (if it is
+ /// sleeping). When this method is invoked, we typically know the
+ /// thread is asleep, though in rare cases it could have been
+ /// awoken by (e.g.) new work having been posted.
+ pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+ self.wake_specific_thread(target_worker_index);
+ }
+
+ /// Signals that `num_jobs` new jobs were injected into the thread
+ /// pool from outside. This function will ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary.
+ ///
+ /// # Parameters
+ ///
+ /// - `source_worker_index` -- index of the thread that did the
+ /// push, or `usize::MAX` if this came from outside the thread
+ /// pool -- it is used only for logging.
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_injected_jobs(
+ &self,
+ source_worker_index: usize,
+ num_jobs: u32,
+ queue_was_empty: bool,
+ ) {
+ // This fence is needed to guarantee that threads
+ // as they are about to fall asleep, observe any
+ // new jobs that may have been injected.
+ std::sync::atomic::fence(Ordering::SeqCst);
+
+ self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ }
+
+ /// Signals that `num_jobs` new jobs were pushed onto a thread's
+ /// local deque. This function will try to ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary. However, this is not guaranteed: under certain
+ /// race conditions, the function may fail to wake any new
+ /// threads; in that case the existing thread should eventually
+ /// pop the job.
+ ///
+ /// # Parameters
+ ///
+ /// - `source_worker_index` -- index of the thread that did the
+ /// push, or `usize::MAX` if this came from outside the thread
+ /// pool -- it is used only for logging.
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_internal_jobs(
+ &self,
+ source_worker_index: usize,
+ num_jobs: u32,
+ queue_was_empty: bool,
+ ) {
+ self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ }
+
+ /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
+ #[inline]
+ fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
+ // Read the counters and -- if sleepy workers have announced themselves
+ // -- announce that there is now work available. The final value of `counters`
+ // with which we exit the loop thus corresponds to a state when
+ let counters = self
+ .counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
+ let num_awake_but_idle = counters.awake_but_idle_threads();
+ let num_sleepers = counters.sleeping_threads();
+
+ self.logger.log(|| JobThreadCounts {
+ worker: source_worker_index,
+ num_idle: num_awake_but_idle as u16,
+ num_sleepers: num_sleepers as u16,
+ });
+
+ if num_sleepers == 0 {
+ // nobody to wake
+ return;
+ }
+
+ // Promote from u16 to u32 so we can interoperate with
+ // num_jobs more easily.
+ let num_awake_but_idle = num_awake_but_idle as u32;
+ let num_sleepers = num_sleepers as u32;
+
+ // If the queue is non-empty, then we always wake up a worker
+ // -- clearly the existing idle jobs aren't enough. Otherwise,
+ // check to see if we have enough idle workers.
+ if !queue_was_empty {
+ let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ } else if num_awake_but_idle < num_jobs {
+ let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ }
+ }
+
+ #[cold]
+ fn wake_any_threads(&self, mut num_to_wake: u32) {
+ if num_to_wake > 0 {
+ for i in 0..self.worker_sleep_states.len() {
+ if self.wake_specific_thread(i) {
+ num_to_wake -= 1;
+ if num_to_wake == 0 {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ fn wake_specific_thread(&self, index: usize) -> bool {
+ let sleep_state = &self.worker_sleep_states[index];
+
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ if *is_blocked {
+ *is_blocked = false;
+ sleep_state.condvar.notify_one();
+
+ // When the thread went to sleep, it will have incremented
+ // this value. When we wake it, its our job to decrement
+ // it. We could have the thread do it, but that would
+ // introduce a delay between when the thread was
+ // *notified* and when this counter was decremented. That
+ // might mislead people with new work into thinking that
+ // there are sleeping threads that they should try to
+ // wake, when in fact there is nothing left for them to
+ // do.
+ self.counters.sub_sleeping_thread();
+
+ self.logger.log(|| ThreadNotify { worker: index });
+
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl IdleState {
+ fn wake_fully(&mut self) {
+ self.rounds = 0;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+
+ fn wake_partly(&mut self) {
+ self.rounds = ROUNDS_UNTIL_SLEEPY;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+}
diff --git a/src/spawn/mod.rs b/src/spawn/mod.rs
new file mode 100644
index 0000000..0006103
--- /dev/null
+++ b/src/spawn/mod.rs
@@ -0,0 +1,168 @@
+use crate::job::*;
+use crate::registry::Registry;
+use crate::unwind;
+use std::mem;
+use std::sync::Arc;
+
+/// Fires off a task into the Rayon threadpool in the "static" or
+/// "global" scope. Just like a standard thread, this task is not
+/// tied to the current stack frame, and hence it cannot hold any
+/// references other than those with `'static` lifetime. If you want
+/// to spawn a task that references stack data, use [the `scope()`
+/// function][scope] to create a scope.
+///
+/// [scope]: fn.scope.html
+///
+/// Since tasks spawned with this function cannot hold references into
+/// the enclosing stack frame, you almost certainly want to use a
+/// `move` closure as their argument (otherwise, the closure will
+/// typically hold references to any variables from the enclosing
+/// function that you happen to use).
+///
+/// This API assumes that the closure is executed purely for its
+/// side-effects (i.e., it might send messages, modify data protected
+/// by a mutex, or some such thing).
+///
+/// There is no guaranteed order of execution for spawns, given that
+/// other threads may steal tasks at any time. However, they are
+/// generally prioritized in a LIFO order on the thread from which
+/// they were spawned. Other threads always steal from the other end of
+/// the deque, like FIFO order. The idea is that "recent" tasks are
+/// most likely to be fresh in the local CPU's cache, while other
+/// threads can steal older "stale" tasks. For an alternate approach,
+/// consider [`spawn_fifo()`] instead.
+///
+/// [`spawn_fifo()`]: fn.spawn_fifo.html
+///
+/// # Panic handling
+///
+/// If this closure should panic, the resulting panic will be
+/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
+/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
+/// details.
+///
+/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
+///
+/// # Examples
+///
+/// This code creates a Rayon task that increments a global counter.
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
+///
+/// static GLOBAL_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
+///
+/// rayon::spawn(move || {
+/// GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
+/// });
+/// ```
+pub fn spawn<F>(func: F)
+where
+ F: FnOnce() + Send + 'static,
+{
+ // We assert that current registry has not terminated.
+ unsafe { spawn_in(func, &Registry::current()) }
+}
+
+/// Spawns an asynchronous job in `registry.`
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
+where
+ F: FnOnce() + Send + 'static,
+{
+ // We assert that this does not hold any references (we know
+ // this because of the `'static` bound in the inferface);
+ // moreover, we assert that the code below is not supposed to
+ // be able to panic, and hence the data won't leak but will be
+ // enqueued into some deque for later execution.
+ let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
+ let job_ref = spawn_job(func, registry);
+ registry.inject_or_push(job_ref);
+ mem::forget(abort_guard);
+}
+
+unsafe fn spawn_job<F>(func: F, registry: &Arc<Registry>) -> JobRef
+where
+ F: FnOnce() + Send + 'static,
+{
+ // Ensure that registry cannot terminate until this job has
+ // executed. This ref is decremented at the (*) below.
+ registry.increment_terminate_count();
+
+ Box::new(HeapJob::new({
+ let registry = registry.clone();
+ move || {
+ match unwind::halt_unwinding(func) {
+ Ok(()) => {}
+ Err(err) => {
+ registry.handle_panic(err);
+ }
+ }
+ registry.terminate(); // (*) permit registry to terminate now
+ }
+ }))
+ .as_job_ref()
+}
+
+/// Fires off a task into the Rayon threadpool in the "static" or
+/// "global" scope. Just like a standard thread, this task is not
+/// tied to the current stack frame, and hence it cannot hold any
+/// references other than those with `'static` lifetime. If you want
+/// to spawn a task that references stack data, use [the `scope_fifo()`
+/// function](fn.scope_fifo.html) to create a scope.
+///
+/// The behavior is essentially the same as [the `spawn`
+/// function](fn.spawn.html), except that calls from the same thread
+/// will be prioritized in FIFO order. This is similar to the now-
+/// deprecated [`breadth_first`] option, except the effect is isolated
+/// to relative `spawn_fifo` calls, not all threadpool tasks.
+///
+/// For more details on this design, see Rayon [RFC #1].
+///
+/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first
+/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
+///
+/// # Panic handling
+///
+/// If this closure should panic, the resulting panic will be
+/// propagated to the panic handler registered in the `ThreadPoolBuilder`,
+/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more
+/// details.
+///
+/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler
+pub fn spawn_fifo<F>(func: F)
+where
+ F: FnOnce() + Send + 'static,
+{
+ // We assert that current registry has not terminated.
+ unsafe { spawn_fifo_in(func, &Registry::current()) }
+}
+
+/// Spawns an asynchronous FIFO job in `registry.`
+///
+/// Unsafe because `registry` must not yet have terminated.
+pub(super) unsafe fn spawn_fifo_in<F>(func: F, registry: &Arc<Registry>)
+where
+ F: FnOnce() + Send + 'static,
+{
+ // We assert that this does not hold any references (we know
+ // this because of the `'static` bound in the inferface);
+ // moreover, we assert that the code below is not supposed to
+ // be able to panic, and hence the data won't leak but will be
+ // enqueued into some deque for later execution.
+ let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
+ let job_ref = spawn_job(func, registry);
+
+ // If we're in the pool, use our thread's private fifo for this thread to execute
+ // in a locally-FIFO order. Otherwise, just use the pool's global injector.
+ match registry.current_thread() {
+ Some(worker) => worker.push_fifo(job_ref),
+ None => registry.inject(&[job_ref]),
+ }
+ mem::forget(abort_guard);
+}
+
+#[cfg(test)]
+mod test;
diff --git a/src/spawn/test.rs b/src/spawn/test.rs
new file mode 100644
index 0000000..9c59754
--- /dev/null
+++ b/src/spawn/test.rs
@@ -0,0 +1,243 @@
+use crate::scope;
+use std::any::Any;
+use std::sync::mpsc::channel;
+use std::sync::Mutex;
+
+use super::{spawn, spawn_fifo};
+use crate::ThreadPoolBuilder;
+
+#[test]
+fn spawn_then_join_in_worker() {
+ let (tx, rx) = channel();
+ scope(move |_| {
+ spawn(move || tx.send(22).unwrap());
+ });
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn spawn_then_join_outside_worker() {
+ let (tx, rx) = channel();
+ spawn(move || tx.send(22).unwrap());
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn panic_fwd() {
+ let (tx, rx) = channel();
+
+ let tx = Mutex::new(tx);
+ let panic_handler = move |err: Box<dyn Any + Send>| {
+ let tx = tx.lock().unwrap();
+ if let Some(&msg) = err.downcast_ref::<&str>() {
+ if msg == "Hello, world!" {
+ tx.send(1).unwrap();
+ } else {
+ tx.send(2).unwrap();
+ }
+ } else {
+ tx.send(3).unwrap();
+ }
+ };
+
+ let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
+
+ builder
+ .build()
+ .unwrap()
+ .spawn(move || panic!("Hello, world!"));
+
+ assert_eq!(1, rx.recv().unwrap());
+}
+
+/// Test what happens when the thread-pool is dropped but there are
+/// still active asynchronous tasks. We expect the thread-pool to stay
+/// alive and executing until those threads are complete.
+#[test]
+fn termination_while_things_are_executing() {
+ let (tx0, rx0) = channel();
+ let (tx1, rx1) = channel();
+
+ // Create a thread-pool and spawn some code in it, but then drop
+ // our reference to it.
+ {
+ let thread_pool = ThreadPoolBuilder::new().build().unwrap();
+ thread_pool.spawn(move || {
+ let data = rx0.recv().unwrap();
+
+ // At this point, we know the "main" reference to the
+ // `ThreadPool` has been dropped, but there are still
+ // active threads. Launch one more.
+ spawn(move || {
+ tx1.send(data).unwrap();
+ });
+ });
+ }
+
+ tx0.send(22).unwrap();
+ let v = rx1.recv().unwrap();
+ assert_eq!(v, 22);
+}
+
+#[test]
+fn custom_panic_handler_and_spawn() {
+ let (tx, rx) = channel();
+
+ // Create a parallel closure that will send panics on the
+ // channel; since the closure is potentially executed in parallel
+ // with itself, we have to wrap `tx` in a mutex.
+ let tx = Mutex::new(tx);
+ let panic_handler = move |e: Box<dyn Any + Send>| {
+ tx.lock().unwrap().send(e).unwrap();
+ };
+
+ // Execute an async that will panic.
+ let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
+ builder.build().unwrap().spawn(move || {
+ panic!("Hello, world!");
+ });
+
+ // Check that we got back the panic we expected.
+ let error = rx.recv().unwrap();
+ if let Some(&msg) = error.downcast_ref::<&str>() {
+ assert_eq!(msg, "Hello, world!");
+ } else {
+ panic!("did not receive a string from panic handler");
+ }
+}
+
+#[test]
+fn custom_panic_handler_and_nested_spawn() {
+ let (tx, rx) = channel();
+
+ // Create a parallel closure that will send panics on the
+ // channel; since the closure is potentially executed in parallel
+ // with itself, we have to wrap `tx` in a mutex.
+ let tx = Mutex::new(tx);
+ let panic_handler = move |e| {
+ tx.lock().unwrap().send(e).unwrap();
+ };
+
+ // Execute an async that will (eventually) panic.
+ const PANICS: usize = 3;
+ let builder = ThreadPoolBuilder::new().panic_handler(panic_handler);
+ builder.build().unwrap().spawn(move || {
+ // launch 3 nested spawn-asyncs; these should be in the same
+ // thread-pool and hence inherit the same panic handler
+ for _ in 0..PANICS {
+ spawn(move || {
+ panic!("Hello, world!");
+ });
+ }
+ });
+
+ // Check that we get back the panics we expected.
+ for _ in 0..PANICS {
+ let error = rx.recv().unwrap();
+ if let Some(&msg) = error.downcast_ref::<&str>() {
+ assert_eq!(msg, "Hello, world!");
+ } else {
+ panic!("did not receive a string from panic handler");
+ }
+ }
+}
+
+macro_rules! test_order {
+ ($outer_spawn:ident, $inner_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ let (tx, rx) = channel();
+ pool.install(move || {
+ for i in 0..10 {
+ let tx = tx.clone();
+ $outer_spawn(move || {
+ for j in 0..10 {
+ let tx = tx.clone();
+ $inner_spawn(move || {
+ tx.send(i * 10 + j).unwrap();
+ });
+ }
+ });
+ }
+ });
+ rx.iter().collect::<Vec<i32>>()
+ }};
+}
+
+#[test]
+fn lifo_order() {
+ // In the absense of stealing, `spawn()` jobs on a thread will run in LIFO order.
+ let vec = test_order!(spawn, spawn);
+ let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn fifo_order() {
+ // In the absense of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
+ let vec = test_order!(spawn_fifo, spawn_fifo);
+ let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn lifo_fifo_order() {
+ // LIFO on the outside, FIFO on the inside
+ let vec = test_order!(spawn, spawn_fifo);
+ let expected: Vec<i32> = (0..10)
+ .rev()
+ .flat_map(|i| (0..10).map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn fifo_lifo_order() {
+ // FIFO on the outside, LIFO on the inside
+ let vec = test_order!(spawn_fifo, spawn);
+ let expected: Vec<i32> = (0..10)
+ .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j))
+ .collect();
+ assert_eq!(vec, expected);
+}
+
+macro_rules! spawn_send {
+ ($spawn:ident, $tx:ident, $i:expr) => {{
+ let tx = $tx.clone();
+ $spawn(move || tx.send($i).unwrap());
+ }};
+}
+
+/// Test mixed spawns pushing a series of numbers, interleaved such
+/// such that negative values are using the second kind of spawn.
+macro_rules! test_mixed_order {
+ ($pos_spawn:ident, $neg_spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ let (tx, rx) = channel();
+ pool.install(move || {
+ spawn_send!($pos_spawn, tx, 0);
+ spawn_send!($neg_spawn, tx, -1);
+ spawn_send!($pos_spawn, tx, 1);
+ spawn_send!($neg_spawn, tx, -2);
+ spawn_send!($pos_spawn, tx, 2);
+ spawn_send!($neg_spawn, tx, -3);
+ spawn_send!($pos_spawn, tx, 3);
+ });
+ rx.iter().collect::<Vec<i32>>()
+ }};
+}
+
+#[test]
+fn mixed_lifo_fifo_order() {
+ let vec = test_mixed_order!(spawn, spawn_fifo);
+ let expected = vec![3, -1, 2, -2, 1, -3, 0];
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn mixed_fifo_lifo_order() {
+ let vec = test_mixed_order!(spawn_fifo, spawn);
+ let expected = vec![0, -3, 1, -2, 2, -1, 3];
+ assert_eq!(vec, expected);
+}
diff --git a/src/test.rs b/src/test.rs
new file mode 100644
index 0000000..015d3ec
--- /dev/null
+++ b/src/test.rs
@@ -0,0 +1,195 @@
+#![cfg(test)]
+
+#[allow(deprecated)]
+use crate::Configuration;
+use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Barrier};
+
+#[test]
+fn worker_thread_index() {
+ let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ assert_eq!(pool.current_num_threads(), 22);
+ assert_eq!(pool.current_thread_index(), None);
+ let index = pool.install(|| pool.current_thread_index().unwrap());
+ assert!(index < 22);
+}
+
+#[test]
+fn start_callback_called() {
+ let n_threads = 16;
+ let n_called = Arc::new(AtomicUsize::new(0));
+ // Wait for all the threads in the pool plus the one running tests.
+ let barrier = Arc::new(Barrier::new(n_threads + 1));
+
+ let b = barrier.clone();
+ let nc = n_called.clone();
+ let start_handler = move |_| {
+ nc.fetch_add(1, Ordering::SeqCst);
+ b.wait();
+ };
+
+ let conf = ThreadPoolBuilder::new()
+ .num_threads(n_threads)
+ .start_handler(start_handler);
+ let _ = conf.build().unwrap();
+
+ // Wait for all the threads to have been scheduled to run.
+ barrier.wait();
+
+ // The handler must have been called on every started thread.
+ assert_eq!(n_called.load(Ordering::SeqCst), n_threads);
+}
+
+#[test]
+fn exit_callback_called() {
+ let n_threads = 16;
+ let n_called = Arc::new(AtomicUsize::new(0));
+ // Wait for all the threads in the pool plus the one running tests.
+ let barrier = Arc::new(Barrier::new(n_threads + 1));
+
+ let b = barrier.clone();
+ let nc = n_called.clone();
+ let exit_handler = move |_| {
+ nc.fetch_add(1, Ordering::SeqCst);
+ b.wait();
+ };
+
+ let conf = ThreadPoolBuilder::new()
+ .num_threads(n_threads)
+ .exit_handler(exit_handler);
+ {
+ let _ = conf.build().unwrap();
+ // Drop the pool so it stops the running threads.
+ }
+
+ // Wait for all the threads to have been scheduled to run.
+ barrier.wait();
+
+ // The handler must have been called on every exiting thread.
+ assert_eq!(n_called.load(Ordering::SeqCst), n_threads);
+}
+
+#[test]
+fn handler_panics_handled_correctly() {
+ let n_threads = 16;
+ let n_called = Arc::new(AtomicUsize::new(0));
+ // Wait for all the threads in the pool plus the one running tests.
+ let start_barrier = Arc::new(Barrier::new(n_threads + 1));
+ let exit_barrier = Arc::new(Barrier::new(n_threads + 1));
+
+ let start_handler = move |_| {
+ panic!("ensure panic handler is called when starting");
+ };
+ let exit_handler = move |_| {
+ panic!("ensure panic handler is called when exiting");
+ };
+
+ let sb = start_barrier.clone();
+ let eb = exit_barrier.clone();
+ let nc = n_called.clone();
+ let panic_handler = move |_| {
+ let val = nc.fetch_add(1, Ordering::SeqCst);
+ if val < n_threads {
+ sb.wait();
+ } else {
+ eb.wait();
+ }
+ };
+
+ let conf = ThreadPoolBuilder::new()
+ .num_threads(n_threads)
+ .start_handler(start_handler)
+ .exit_handler(exit_handler)
+ .panic_handler(panic_handler);
+ {
+ let _ = conf.build().unwrap();
+
+ // Wait for all the threads to start, panic in the start handler,
+ // and been taken care of by the panic handler.
+ start_barrier.wait();
+
+ // Drop the pool so it stops the running threads.
+ }
+
+ // Wait for all the threads to exit, panic in the exit handler,
+ // and been taken care of by the panic handler.
+ exit_barrier.wait();
+
+ // The panic handler must have been called twice on every thread.
+ assert_eq!(n_called.load(Ordering::SeqCst), 2 * n_threads);
+}
+
+#[test]
+#[allow(deprecated)]
+fn check_config_build() {
+ let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ assert_eq!(pool.current_num_threads(), 22);
+}
+
+/// Helper used by check_error_send_sync to ensure ThreadPoolBuildError is Send + Sync
+fn _send_sync<T: Send + Sync>() {}
+
+#[test]
+fn check_error_send_sync() {
+ _send_sync::<ThreadPoolBuildError>();
+}
+
+#[allow(deprecated)]
+#[test]
+fn configuration() {
+ let start_handler = move |_| {};
+ let exit_handler = move |_| {};
+ let panic_handler = move |_| {};
+ let thread_name = move |i| format!("thread_name_{}", i);
+
+ // Ensure we can call all public methods on Configuration
+ Configuration::new()
+ .thread_name(thread_name)
+ .num_threads(5)
+ .panic_handler(panic_handler)
+ .stack_size(4e6 as usize)
+ .breadth_first()
+ .start_handler(start_handler)
+ .exit_handler(exit_handler)
+ .build()
+ .unwrap();
+}
+
+#[test]
+fn default_pool() {
+ ThreadPoolBuilder::default().build().unwrap();
+}
+
+/// Test that custom spawned threads get their `WorkerThread` cleared once
+/// the pool is done with them, allowing them to be used with rayon again
+/// later. e.g. WebAssembly want to have their own pool of available threads.
+#[test]
+fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
+ let n_threads = 5;
+ let mut handles = vec![];
+ let pool = ThreadPoolBuilder::new()
+ .num_threads(n_threads)
+ .spawn_handler(|thread| {
+ let handle = std::thread::spawn(move || {
+ thread.run();
+
+ // Afterward, the current thread shouldn't be set anymore.
+ assert_eq!(crate::current_thread_index(), None);
+ });
+ handles.push(handle);
+ Ok(())
+ })
+ .build()?;
+ assert_eq!(handles.len(), n_threads);
+
+ pool.install(|| assert!(crate::current_thread_index().is_some()));
+ drop(pool);
+
+ // Wait for all threads to make their assertions and exit
+ for handle in handles {
+ handle.join().unwrap();
+ }
+
+ Ok(())
+}
diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs
new file mode 100644
index 0000000..2209f63
--- /dev/null
+++ b/src/thread_pool/mod.rs
@@ -0,0 +1,315 @@
+//! Contains support for user-managed thread pools, represented by the
+//! the [`ThreadPool`] type (see that struct for details).
+//!
+//! [`ThreadPool`]: struct.ThreadPool.html
+
+use crate::join;
+use crate::registry::{Registry, ThreadSpawn, WorkerThread};
+use crate::spawn;
+#[allow(deprecated)]
+use crate::Configuration;
+use crate::{scope, Scope};
+use crate::{scope_fifo, ScopeFifo};
+use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
+use std::error::Error;
+use std::fmt;
+use std::sync::Arc;
+
+mod test;
+
+/// Represents a user created [thread-pool].
+///
+/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
+/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
+/// execute functions explicitly within this [`ThreadPool`] using
+/// [`ThreadPool::install()`]. By contrast, top level rayon functions
+/// (like `join()`) will execute implicitly within the current thread-pool.
+///
+///
+/// ## Creating a ThreadPool
+///
+/// ```rust
+/// # use rayon_core as rayon;
+/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
+/// ```
+///
+/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
+/// threads. In addition, any other rayon operations called inside of `install()` will also
+/// execute in the context of the `ThreadPool`.
+///
+/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
+/// they will complete executing any remaining work that you have spawned, and automatically
+/// terminate.
+///
+///
+/// [thread-pool]: https://en.wikipedia.org/wiki/Thread_pool
+/// [`ThreadPool`]: struct.ThreadPool.html
+/// [`ThreadPool::new()`]: struct.ThreadPool.html#method.new
+/// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
+/// [`ThreadPoolBuilder::build()`]: struct.ThreadPoolBuilder.html#method.build
+/// [`ThreadPool::install()`]: struct.ThreadPool.html#method.install
+pub struct ThreadPool {
+ registry: Arc<Registry>,
+}
+
+impl ThreadPool {
+ #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
+ #[allow(deprecated)]
+ /// Deprecated in favor of `ThreadPoolBuilder::build`.
+ pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<dyn Error>> {
+ Self::build(configuration.into_builder()).map_err(Box::from)
+ }
+
+ pub(super) fn build<S>(
+ builder: ThreadPoolBuilder<S>,
+ ) -> Result<ThreadPool, ThreadPoolBuildError>
+ where
+ S: ThreadSpawn,
+ {
+ let registry = Registry::new(builder)?;
+ Ok(ThreadPool { registry })
+ }
+
+ /// Executes `op` within the threadpool. Any attempts to use
+ /// `join`, `scope`, or parallel iterators will then operate
+ /// within that threadpool.
+ ///
+ /// # Warning: thread-local data
+ ///
+ /// Because `op` is executing within the Rayon thread-pool,
+ /// thread-local data from the current thread will not be
+ /// accessible.
+ ///
+ /// # Panics
+ ///
+ /// If `op` should panic, that panic will be propagated.
+ ///
+ /// ## Using `install()`
+ ///
+ /// ```rust
+ /// # use rayon_core as rayon;
+ /// fn main() {
+ /// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
+ /// let n = pool.install(|| fib(20));
+ /// println!("{}", n);
+ /// }
+ ///
+ /// fn fib(n: usize) -> usize {
+ /// if n == 0 || n == 1 {
+ /// return n;
+ /// }
+ /// let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
+ /// return a + b;
+ /// }
+ /// ```
+ pub fn install<OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce() -> R + Send,
+ R: Send,
+ {
+ self.registry.in_worker(|_, _| op())
+ }
+
+ /// Returns the (current) number of threads in the thread pool.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Note that unless this thread-pool was created with a
+ /// [`ThreadPoolBuilder`] that specifies the number of threads,
+ /// then this number may vary over time in future versions (see [the
+ /// `num_threads()` method for details][snt]).
+ ///
+ /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+ /// [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
+ #[inline]
+ pub fn current_num_threads(&self) -> usize {
+ self.registry.num_threads()
+ }
+
+ /// If called from a Rayon worker thread in this thread-pool,
+ /// returns the index of that thread; if not called from a Rayon
+ /// thread, or called from a Rayon thread that belongs to a
+ /// different thread-pool, returns `None`.
+ ///
+ /// The index for a given thread will not change over the thread's
+ /// lifetime. However, multiple threads may share the same index if
+ /// they are in distinct thread-pools.
+ ///
+ /// # Future compatibility note
+ ///
+ /// Currently, every thread-pool (including the global
+ /// thread-pool) has a fixed number of threads, but this may
+ /// change in future Rayon versions (see [the `num_threads()` method
+ /// for details][snt]). In that case, the index for a
+ /// thread would not change during its lifetime, but thread
+ /// indices may wind up being reused if threads are terminated and
+ /// restarted.
+ ///
+ /// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+ #[inline]
+ pub fn current_thread_index(&self) -> Option<usize> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.index())
+ }
+
+ /// Returns true if the current worker thread currently has "local
+ /// tasks" pending. This can be useful as part of a heuristic for
+ /// deciding whether to spawn a new task or execute code on the
+ /// current thread, particularly in breadth-first
+ /// schedulers. However, keep in mind that this is an inherently
+ /// racy check, as other worker threads may be actively "stealing"
+ /// tasks from our local deque.
+ ///
+ /// **Background:** Rayon's uses a [work-stealing] scheduler. The
+ /// key idea is that each thread has its own [deque] of
+ /// tasks. Whenever a new task is spawned -- whether through
+ /// `join()`, `Scope::spawn()`, or some other means -- that new
+ /// task is pushed onto the thread's *local* deque. Worker threads
+ /// have a preference for executing their own tasks; if however
+ /// they run out of tasks, they will go try to "steal" tasks from
+ /// other threads. This function therefore has an inherent race
+ /// with other active worker threads, which may be removing items
+ /// from the local deque.
+ ///
+ /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
+ /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
+ #[inline]
+ pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
+ let curr = self.registry.current_thread()?;
+ Some(!curr.local_deque_is_empty())
+ }
+
+ /// Execute `oper_a` and `oper_b` in the thread-pool and return
+ /// the results. Equivalent to `self.install(|| join(oper_a,
+ /// oper_b))`.
+ pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
+ where
+ A: FnOnce() -> RA + Send,
+ B: FnOnce() -> RB + Send,
+ RA: Send,
+ RB: Send,
+ {
+ self.install(|| join(oper_a, oper_b))
+ }
+
+ /// Creates a scope that executes within this thread-pool.
+ /// Equivalent to `self.install(|| scope(...))`.
+ ///
+ /// See also: [the `scope()` function][scope].
+ ///
+ /// [scope]: fn.scope.html
+ pub fn scope<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&Scope<'scope>) -> R + Send,
+ R: Send,
+ {
+ self.install(|| scope(op))
+ }
+
+ /// Creates a scope that executes within this thread-pool.
+ /// Spawns from the same thread are prioritized in relative FIFO order.
+ /// Equivalent to `self.install(|| scope_fifo(...))`.
+ ///
+ /// See also: [the `scope_fifo()` function][scope_fifo].
+ ///
+ /// [scope_fifo]: fn.scope_fifo.html
+ pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
+ R: Send,
+ {
+ self.install(|| scope_fifo(op))
+ }
+
+ /// Spawns an asynchronous task in this thread-pool. This task will
+ /// run in the implicit, global scope, which means that it may outlast
+ /// the current stack frame -- therefore, it cannot capture any references
+ /// onto the stack (you will likely need a `move` closure).
+ ///
+ /// See also: [the `spawn()` function defined on scopes][spawn].
+ ///
+ /// [spawn]: struct.Scope.html#method.spawn
+ pub fn spawn<OP>(&self, op: OP)
+ where
+ OP: FnOnce() + Send + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { spawn::spawn_in(op, &self.registry) }
+ }
+
+ /// Spawns an asynchronous task in this thread-pool. This task will
+ /// run in the implicit, global scope, which means that it may outlast
+ /// the current stack frame -- therefore, it cannot capture any references
+ /// onto the stack (you will likely need a `move` closure).
+ ///
+ /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
+ ///
+ /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo
+ pub fn spawn_fifo<OP>(&self, op: OP)
+ where
+ OP: FnOnce() + Send + 'static,
+ {
+ // We assert that `self.registry` has not terminated.
+ unsafe { spawn::spawn_fifo_in(op, &self.registry) }
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.registry.terminate();
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool")
+ .field("num_threads", &self.current_num_threads())
+ .field("id", &self.registry.id())
+ .finish()
+ }
+}
+
+/// If called from a Rayon worker thread, returns the index of that
+/// thread within its current pool; if not called from a Rayon thread,
+/// returns `None`.
+///
+/// The index for a given thread will not change over the thread's
+/// lifetime. However, multiple threads may share the same index if
+/// they are in distinct thread-pools.
+///
+/// See also: [the `ThreadPool::current_thread_index()` method].
+///
+/// [m]: struct.ThreadPool.html#method.current_thread_index
+///
+/// # Future compatibility note
+///
+/// Currently, every thread-pool (including the global
+/// thread-pool) has a fixed number of threads, but this may
+/// change in future Rayon versions (see [the `num_threads()` method
+/// for details][snt]). In that case, the index for a
+/// thread would not change during its lifetime, but thread
+/// indices may wind up being reused if threads are terminated and
+/// restarted.
+///
+/// [snt]: struct.ThreadPoolBuilder.html#method.num_threads
+#[inline]
+pub fn current_thread_index() -> Option<usize> {
+ unsafe {
+ let curr = WorkerThread::current().as_ref()?;
+ Some(curr.index())
+ }
+}
+
+/// If called from a Rayon worker thread, indicates whether that
+/// thread's local deque still has pending tasks. Otherwise, returns
+/// `None`. For more information, see [the
+/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
+///
+/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
+#[inline]
+pub fn current_thread_has_pending_tasks() -> Option<bool> {
+ unsafe {
+ let curr = WorkerThread::current().as_ref()?;
+ Some(!curr.local_deque_is_empty())
+ }
+}
diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs
new file mode 100644
index 0000000..8d1c90c
--- /dev/null
+++ b/src/thread_pool/test.rs
@@ -0,0 +1,338 @@
+#![cfg(test)]
+
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc::channel;
+use std::sync::{Arc, Mutex};
+
+#[allow(deprecated)]
+use crate::Configuration;
+use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
+
+#[test]
+#[should_panic(expected = "Hello, world!")]
+fn panic_propagate() {
+ let thread_pool = ThreadPoolBuilder::new().build().unwrap();
+ thread_pool.install(|| {
+ panic!("Hello, world!");
+ });
+}
+
+#[test]
+fn workers_stop() {
+ let registry;
+
+ {
+ // once we exit this block, thread-pool will be dropped
+ let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ registry = thread_pool.install(|| {
+ // do some work on these threads
+ join_a_lot(22);
+
+ thread_pool.registry.clone()
+ });
+ assert_eq!(registry.num_threads(), 22);
+ }
+
+ // once thread-pool is dropped, registry should terminate, which
+ // should lead to worker threads stopping
+ registry.wait_until_stopped();
+}
+
+fn join_a_lot(n: usize) {
+ if n > 0 {
+ join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
+ }
+}
+
+#[test]
+fn sleeper_stop() {
+ use std::{thread, time};
+
+ let registry;
+
+ {
+ // once we exit this block, thread-pool will be dropped
+ let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
+ registry = thread_pool.registry.clone();
+
+ // Give time for at least some of the thread pool to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+ }
+
+ // once thread-pool is dropped, registry should terminate, which
+ // should lead to worker threads stopping
+ registry.wait_until_stopped();
+}
+
+/// Creates a start/exit handler that increments an atomic counter.
+fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
+ let count = Arc::new(AtomicUsize::new(0));
+ (count.clone(), move |_| {
+ count.fetch_add(1, Ordering::SeqCst);
+ })
+}
+
+/// Wait until a counter is no longer shared, then return its value.
+fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
+ use std::{thread, time};
+
+ for _ in 0..60 {
+ counter = match Arc::try_unwrap(counter) {
+ Ok(counter) => return counter.into_inner(),
+ Err(counter) => {
+ thread::sleep(time::Duration::from_secs(1));
+ counter
+ }
+ };
+ }
+
+ // That's too long!
+ panic!("Counter is still shared!");
+}
+
+#[test]
+fn failed_thread_stack() {
+ // Note: we first tried to force failure with a `usize::MAX` stack, but
+ // macOS and Windows weren't fazed, or at least didn't fail the way we want.
+ // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
+ // 2GB stack, so it might not fail until the second thread.
+ let stack_size = ::std::isize::MAX as usize;
+
+ let (start_count, start_handler) = count_handler();
+ let (exit_count, exit_handler) = count_handler();
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(10)
+ .stack_size(stack_size)
+ .start_handler(start_handler)
+ .exit_handler(exit_handler);
+
+ let pool = builder.build();
+ assert!(pool.is_err(), "thread stack should have failed!");
+
+ // With such a huge stack, 64-bit will probably fail on the first thread;
+ // 32-bit might manage the first 2GB, but certainly fail the second.
+ let start_count = wait_for_counter(start_count);
+ assert!(start_count <= 1);
+ assert_eq!(start_count, wait_for_counter(exit_count));
+}
+
+#[test]
+fn panic_thread_name() {
+ let (start_count, start_handler) = count_handler();
+ let (exit_count, exit_handler) = count_handler();
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(10)
+ .start_handler(start_handler)
+ .exit_handler(exit_handler)
+ .thread_name(|i| {
+ if i >= 5 {
+ panic!();
+ }
+ format!("panic_thread_name#{}", i)
+ });
+
+ let pool = crate::unwind::halt_unwinding(|| builder.build());
+ assert!(pool.is_err(), "thread-name panic should propagate!");
+
+ // Assuming they're created in order, threads 0 through 4 should have
+ // been started already, and then terminated by the panic.
+ assert_eq!(5, wait_for_counter(start_count));
+ assert_eq!(5, wait_for_counter(exit_count));
+}
+
+#[test]
+fn self_install() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ // If the inner `install` blocks, then nothing will actually run it!
+ assert!(pool.install(|| pool.install(|| true)));
+}
+
+#[test]
+fn mutual_install() {
+ let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ let ok = pool1.install(|| {
+ // This creates a dependency from `pool1` -> `pool2`
+ pool2.install(|| {
+ // This creates a dependency from `pool2` -> `pool1`
+ pool1.install(|| {
+ // If they blocked on inter-pool installs, there would be no
+ // threads left to run this!
+ true
+ })
+ })
+ });
+ assert!(ok);
+}
+
+#[test]
+fn mutual_install_sleepy() {
+ use std::{thread, time};
+
+ let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+
+ let ok = pool1.install(|| {
+ // This creates a dependency from `pool1` -> `pool2`
+ pool2.install(|| {
+ // Give `pool1` time to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+
+ // This creates a dependency from `pool2` -> `pool1`
+ pool1.install(|| {
+ // Give `pool2` time to fall asleep.
+ thread::sleep(time::Duration::from_secs(1));
+
+ // If they blocked on inter-pool installs, there would be no
+ // threads left to run this!
+ true
+ })
+ })
+ });
+ assert!(ok);
+}
+
+#[test]
+#[allow(deprecated)]
+fn check_thread_pool_new() {
+ let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
+ assert_eq!(pool.current_num_threads(), 22);
+}
+
+macro_rules! test_scope_order {
+ ($scope:ident => $spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = builder.build().unwrap();
+ pool.install(|| {
+ let vec = Mutex::new(vec![]);
+ pool.$scope(|scope| {
+ let vec = &vec;
+ for i in 0..10 {
+ scope.$spawn(move |_| {
+ vec.lock().unwrap().push(i);
+ });
+ }
+ });
+ vec.into_inner().unwrap()
+ })
+ }};
+}
+
+#[test]
+fn scope_lifo_order() {
+ let vec = test_scope_order!(scope => spawn);
+ let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn scope_fifo_order() {
+ let vec = test_scope_order!(scope_fifo => spawn_fifo);
+ let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+macro_rules! test_spawn_order {
+ ($spawn:ident) => {{
+ let builder = ThreadPoolBuilder::new().num_threads(1);
+ let pool = &builder.build().unwrap();
+ let (tx, rx) = channel();
+ pool.install(move || {
+ for i in 0..10 {
+ let tx = tx.clone();
+ pool.$spawn(move || {
+ tx.send(i).unwrap();
+ });
+ }
+ });
+ rx.iter().collect::<Vec<i32>>()
+ }};
+}
+
+#[test]
+fn spawn_lifo_order() {
+ let vec = test_spawn_order!(spawn);
+ let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn spawn_fifo_order() {
+ let vec = test_spawn_order!(spawn_fifo);
+ let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
+ assert_eq!(vec, expected);
+}
+
+#[test]
+fn nested_scopes() {
+ // Create matching scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&Scope<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
+
+#[test]
+fn nested_fifo_scopes() {
+ // Create matching fifo scopes for every thread pool.
+ fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
+ where
+ OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
+ {
+ if let Some((pool, tail)) = pools.split_first() {
+ pool.scope_fifo(move |s| {
+ // This move reduces the reference lifetimes by variance to match s,
+ // but the actual scopes are still tied to the invariant 'scope.
+ let mut scopes = scopes;
+ scopes.push(s);
+ nest(tail, scopes, op)
+ })
+ } else {
+ (op)(&scopes)
+ }
+ }
+
+ let pools: Vec<_> = (0..10)
+ .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
+ .collect();
+
+ let counter = AtomicUsize::new(0);
+ nest(&pools, vec![], |scopes| {
+ for &s in scopes {
+ s.spawn_fifo(|_| {
+ // Our 'scope lets us borrow the counter in every pool.
+ counter.fetch_add(1, Ordering::Relaxed);
+ });
+ }
+ });
+ assert_eq!(counter.into_inner(), pools.len());
+}
diff --git a/src/unwind.rs b/src/unwind.rs
new file mode 100644
index 0000000..9671fa5
--- /dev/null
+++ b/src/unwind.rs
@@ -0,0 +1,31 @@
+//! Package up unwind recovery. Note that if you are in some sensitive
+//! place, you can use the `AbortIfPanic` helper to protect against
+//! accidental panics in the rayon code itself.
+
+use std::any::Any;
+use std::panic::{self, AssertUnwindSafe};
+use std::thread;
+
+/// Executes `f` and captures any panic, translating that panic into a
+/// `Err` result. The assumption is that any panic will be propagated
+/// later with `resume_unwinding`, and hence `f` can be treated as
+/// exception safe.
+pub(super) fn halt_unwinding<F, R>(func: F) -> thread::Result<R>
+where
+ F: FnOnce() -> R,
+{
+ panic::catch_unwind(AssertUnwindSafe(func))
+}
+
+pub(super) fn resume_unwinding(payload: Box<dyn Any + Send>) -> ! {
+ panic::resume_unwind(payload)
+}
+
+pub(super) struct AbortIfPanic;
+
+impl Drop for AbortIfPanic {
+ fn drop(&mut self) {
+ eprintln!("Rayon: detected unexpected panic; aborting");
+ ::std::process::abort();
+ }
+}
diff --git a/src/util.rs b/src/util.rs
new file mode 100644
index 0000000..27d375a
--- /dev/null
+++ b/src/util.rs
@@ -0,0 +1,10 @@
+use std::mem;
+
+pub(super) fn leak<T>(v: T) -> &'static T {
+ unsafe {
+ let b = Box::new(v);
+ let p: *const T = &*b;
+ mem::forget(b); // leak our reference, so that `b` is never freed
+ &*p
+ }
+}
diff --git a/tests/double_init_fail.rs b/tests/double_init_fail.rs
new file mode 100644
index 0000000..ea06bf0
--- /dev/null
+++ b/tests/double_init_fail.rs
@@ -0,0 +1,14 @@
+use rayon_core::ThreadPoolBuilder;
+use std::error::Error;
+
+#[test]
+fn double_init_fail() {
+ let result1 = ThreadPoolBuilder::new().build_global();
+ assert_eq!(result1.unwrap(), ());
+ let err = ThreadPoolBuilder::new().build_global().unwrap_err();
+ assert!(err.source().is_none());
+ assert_eq!(
+ err.to_string(),
+ "The global thread pool has already been initialized.",
+ );
+}
diff --git a/tests/init_zero_threads.rs b/tests/init_zero_threads.rs
new file mode 100644
index 0000000..ebd73c5
--- /dev/null
+++ b/tests/init_zero_threads.rs
@@ -0,0 +1,9 @@
+use rayon_core::ThreadPoolBuilder;
+
+#[test]
+fn init_zero_threads() {
+ ThreadPoolBuilder::new()
+ .num_threads(0)
+ .build_global()
+ .unwrap();
+}
diff --git a/tests/scope_join.rs b/tests/scope_join.rs
new file mode 100644
index 0000000..9d88133
--- /dev/null
+++ b/tests/scope_join.rs
@@ -0,0 +1,45 @@
+/// Test that one can emulate join with `scope`:
+fn pseudo_join<F, G>(f: F, g: G)
+where
+ F: FnOnce() + Send,
+ G: FnOnce() + Send,
+{
+ rayon_core::scope(|s| {
+ s.spawn(|_| g());
+ f();
+ });
+}
+
+fn quick_sort<T: PartialOrd + Send>(v: &mut [T]) {
+ if v.len() <= 1 {
+ return;
+ }
+
+ let mid = partition(v);
+ let (lo, hi) = v.split_at_mut(mid);
+ pseudo_join(|| quick_sort(lo), || quick_sort(hi));
+}
+
+fn partition<T: PartialOrd + Send>(v: &mut [T]) -> usize {
+ let pivot = v.len() - 1;
+ let mut i = 0;
+ for j in 0..pivot {
+ if v[j] <= v[pivot] {
+ v.swap(i, j);
+ i += 1;
+ }
+ }
+ v.swap(i, pivot);
+ i
+}
+
+fn is_sorted<T: Send + Ord>(v: &[T]) -> bool {
+ (1..v.len()).all(|i| v[i - 1] <= v[i])
+}
+
+#[test]
+fn scope_join() {
+ let mut v: Vec<i32> = (0..256).rev().collect();
+ quick_sort(&mut v);
+ assert!(is_sorted(&v));
+}
diff --git a/tests/scoped_threadpool.rs b/tests/scoped_threadpool.rs
new file mode 100644
index 0000000..db3d0b8
--- /dev/null
+++ b/tests/scoped_threadpool.rs
@@ -0,0 +1,96 @@
+use crossbeam_utils::thread;
+use rayon_core::ThreadPoolBuilder;
+
+#[derive(PartialEq, Eq, Debug)]
+struct Local(i32);
+
+scoped_tls::scoped_thread_local!(static LOCAL: Local);
+
+#[test]
+fn missing_scoped_tls() {
+ LOCAL.set(&Local(42), || {
+ let pool = ThreadPoolBuilder::new()
+ .build()
+ .expect("thread pool created");
+
+ // `LOCAL` is not set in the pool.
+ pool.install(|| {
+ assert!(!LOCAL.is_set());
+ });
+ });
+}
+
+#[test]
+fn spawn_scoped_tls_threadpool() {
+ LOCAL.set(&Local(42), || {
+ LOCAL.with(|x| {
+ thread::scope(|scope| {
+ let pool = ThreadPoolBuilder::new()
+ .spawn_handler(move |thread| {
+ scope
+ .builder()
+ .spawn(move |_| {
+ // Borrow the same local value in the thread pool.
+ LOCAL.set(x, || thread.run())
+ })
+ .map(|_| ())
+ })
+ .build()
+ .expect("thread pool created");
+
+ // The pool matches our local value.
+ pool.install(|| {
+ assert!(LOCAL.is_set());
+ LOCAL.with(|y| {
+ assert_eq!(x, y);
+ });
+ });
+
+ // If we change our local value, the pool is not affected.
+ LOCAL.set(&Local(-1), || {
+ pool.install(|| {
+ assert!(LOCAL.is_set());
+ LOCAL.with(|y| {
+ assert_eq!(x, y);
+ });
+ });
+ });
+ })
+ .expect("scope threads ok");
+ // `thread::scope` will wait for the threads to exit before returning.
+ });
+ });
+}
+
+#[test]
+fn build_scoped_tls_threadpool() {
+ LOCAL.set(&Local(42), || {
+ LOCAL.with(|x| {
+ ThreadPoolBuilder::new()
+ .build_scoped(
+ move |thread| LOCAL.set(x, || thread.run()),
+ |pool| {
+ // The pool matches our local value.
+ pool.install(|| {
+ assert!(LOCAL.is_set());
+ LOCAL.with(|y| {
+ assert_eq!(x, y);
+ });
+ });
+
+ // If we change our local value, the pool is not affected.
+ LOCAL.set(&Local(-1), || {
+ pool.install(|| {
+ assert!(LOCAL.is_set());
+ LOCAL.with(|y| {
+ assert_eq!(x, y);
+ });
+ });
+ });
+ },
+ )
+ .expect("thread pool created");
+ // Internally, `crossbeam::scope` will wait for the threads to exit before returning.
+ });
+ });
+}
diff --git a/tests/simple_panic.rs b/tests/simple_panic.rs
new file mode 100644
index 0000000..2564482
--- /dev/null
+++ b/tests/simple_panic.rs
@@ -0,0 +1,7 @@
+use rayon_core::join;
+
+#[test]
+#[should_panic(expected = "should panic")]
+fn simple_panic() {
+ join(|| {}, || panic!("should panic"));
+}
diff --git a/tests/stack_overflow_crash.rs b/tests/stack_overflow_crash.rs
new file mode 100644
index 0000000..6128898
--- /dev/null
+++ b/tests/stack_overflow_crash.rs
@@ -0,0 +1,82 @@
+use rayon_core::ThreadPoolBuilder;
+
+use std::env;
+use std::process::Command;
+
+#[cfg(target_os = "linux")]
+use std::os::unix::process::ExitStatusExt;
+
+fn force_stack_overflow(depth: u32) {
+ let _buffer = [0u8; 1024 * 1024];
+ if depth > 0 {
+ force_stack_overflow(depth - 1);
+ }
+}
+
+#[cfg(unix)]
+fn disable_core() {
+ unsafe {
+ libc::setrlimit(
+ libc::RLIMIT_CORE,
+ &libc::rlimit {
+ rlim_cur: 0,
+ rlim_max: 0,
+ },
+ );
+ }
+}
+
+#[cfg(unix)]
+fn overflow_code() -> Option<i32> {
+ None
+}
+
+#[cfg(windows)]
+fn overflow_code() -> Option<i32> {
+ use std::os::windows::process::ExitStatusExt;
+ use std::process::ExitStatus;
+
+ ExitStatus::from_raw(0xc00000fd /*STATUS_STACK_OVERFLOW*/).code()
+}
+
+fn main() {
+ if env::args().len() == 1 {
+ // first check that the recursivecall actually causes a stack overflow, and does not get optimized away
+ {
+ let status = Command::new(env::current_exe().unwrap())
+ .arg("8")
+ .status()
+ .unwrap();
+
+ #[cfg(any(unix, windows))]
+ assert_eq!(status.code(), overflow_code());
+
+ #[cfg(target_os = "linux")]
+ assert!(
+ status.signal() == Some(11 /*SIGABRT*/) || status.signal() == Some(6 /*SIGSEGV*/)
+ );
+ }
+
+ // now run with a larger stack and verify correct operation
+ {
+ let status = Command::new(env::current_exe().unwrap())
+ .arg("48")
+ .status()
+ .unwrap();
+ assert_eq!(status.code(), Some(0));
+ #[cfg(target_os = "linux")]
+ assert_eq!(status.signal(), None);
+ }
+ } else {
+ let stack_size_in_mb: usize = env::args().nth(1).unwrap().parse().unwrap();
+ let pool = ThreadPoolBuilder::new()
+ .stack_size(stack_size_in_mb * 1024 * 1024)
+ .build()
+ .unwrap();
+ pool.install(|| {
+ #[cfg(unix)]
+ disable_core();
+ force_stack_overflow(32);
+ });
+ }
+}