aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2023-04-11 10:36:26 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-04-11 10:36:26 +0000
commitca7cb9143b50102678fd0ba2bf882f0c8e2896dc (patch)
treeded15626a7744fe21c3f07d1e6d9230e28909490
parent68d6a0ca78a8c8c21541a296e19677751ca16bd1 (diff)
parentf110d982be1a009b2da88e6feeeb261762daa4dc (diff)
downloadrayon-core-ca7cb9143b50102678fd0ba2bf882f0c8e2896dc.tar.gz
Upgrade rayon-core to 1.11.0 am: f1feb8e18d am: 0fd8a3c31f am: 6a06caa05d am: 097a1e8e18 am: f110d982be
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/rayon-core/+/2520315 Change-Id: I7b47fddd9167bbf627dbe617f84e5c7a25b2d8f1 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--Cargo.toml4
-rw-r--r--Cargo.toml.orig4
-rw-r--r--METADATA8
-rw-r--r--README.md2
-rw-r--r--src/broadcast/test.rs46
-rw-r--r--src/job.rs8
-rw-r--r--src/join/mod.rs3
-rw-r--r--src/join/test.rs6
-rw-r--r--src/lib.rs24
-rw-r--r--src/registry.rs145
-rw-r--r--src/scope/mod.rs2
-rw-r--r--src/scope/test.rs18
-rw-r--r--src/spawn/mod.rs2
-rw-r--r--src/spawn/test.rs12
-rw-r--r--src/test.rs8
-rw-r--r--src/thread_pool/mod.rs69
-rw-r--r--src/thread_pool/test.rs52
-rw-r--r--tests/double_init_fail.rs1
-rw-r--r--tests/init_zero_threads.rs1
-rw-r--r--tests/scoped_threadpool.rs3
-rw-r--r--tests/stack_overflow_crash.rs2
23 files changed, 365 insertions, 59 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 213b63f..2a30505 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "b6cdc9da7adc7fe42b28758b2033f0bf8f8dc4b8"
+ "sha1": "6236214d717694917e77aa1c16d91176b9bc2fff"
},
"path_in_vcs": "rayon-core"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index cbcd437..54886c4 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@ rust_library {
host_supported: true,
crate_name: "rayon_core",
cargo_env_compat: true,
- cargo_pkg_version: "1.10.2",
+ cargo_pkg_version: "1.11.0",
srcs: ["src/lib.rs"],
edition: "2021",
rustlibs: [
diff --git a/Cargo.toml b/Cargo.toml
index d7e36a4..d41715e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,9 +11,9 @@
[package]
edition = "2021"
-rust-version = "1.56"
+rust-version = "1.59"
name = "rayon-core"
-version = "1.10.2"
+version = "1.11.0"
authors = [
"Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>",
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 878dac7..920ffe5 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,13 +1,13 @@
[package]
name = "rayon-core"
-version = "1.10.2"
+version = "1.11.0"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rayon-rs/rayon"
documentation = "https://docs.rs/rayon/"
-rust-version = "1.56"
+rust-version = "1.59"
edition = "2021"
links = "rayon-core"
build = "build.rs"
diff --git a/METADATA b/METADATA
index ae2b730..e9917f9 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/rayon-core/rayon-core-1.10.2.crate"
+ value: "https://static.crates.io/crates/rayon-core/rayon-core-1.11.0.crate"
}
- version: "1.10.2"
+ version: "1.11.0"
license_type: NOTICE
last_upgrade_date {
year: 2023
- month: 2
- day: 17
+ month: 4
+ day: 3
}
}
diff --git a/README.md b/README.md
index 7bf2d9b..448901b 100644
--- a/README.md
+++ b/README.md
@@ -8,4 +8,4 @@ Please see [Rayon Docs] for details about using Rayon.
[Rayon Docs]: https://docs.rs/rayon/
-Rayon-core currently requires `rustc 1.56.0` or greater.
+Rayon-core currently requires `rustc 1.59.0` or greater.
diff --git a/src/broadcast/test.rs b/src/broadcast/test.rs
index a765cb0..3ae11f7 100644
--- a/src/broadcast/test.rs
+++ b/src/broadcast/test.rs
@@ -12,6 +12,7 @@ fn broadcast_global() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
let (tx, rx) = crossbeam_channel::unbounded();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
@@ -22,6 +23,7 @@ fn spawn_broadcast_global() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_pool() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.broadcast(|ctx| ctx.index());
@@ -29,6 +31,7 @@ fn broadcast_pool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -40,6 +43,7 @@ fn spawn_broadcast_pool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_self() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
@@ -47,6 +51,7 @@ fn broadcast_self() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -58,6 +63,7 @@ fn spawn_broadcast_self() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -73,6 +79,7 @@ fn broadcast_mutual() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual_sleepy() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
@@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
@@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -146,6 +156,7 @@ fn broadcast_panic_one() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -182,6 +194,7 @@ fn broadcast_panic_many() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
@@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_sleep_race() {
let test_duration = time::Duration::from_secs(1);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -214,3 +228,35 @@ fn broadcast_sleep_race() {
});
}
}
+
+#[test]
+fn broadcast_after_spawn_broadcast() {
+ let (tx, rx) = crossbeam_channel::unbounded();
+
+ // Queue a non-blocking spawn_broadcast.
+ crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
+
+ // This blocking broadcast runs after all prior broadcasts.
+ crate::broadcast(|_| {});
+
+ // The spawn_broadcast **must** have run by now on all threads.
+ let mut v: Vec<_> = rx.try_iter().collect();
+ v.sort_unstable();
+ assert!(v.into_iter().eq(0..crate::current_num_threads()));
+}
+
+#[test]
+fn broadcast_after_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn on a thread-local deque.
+ crate::registry::in_worker(move |_, _| {
+ crate::spawn(move || tx.send(22).unwrap());
+ });
+
+ // Broadcast runs after the local deque is empty.
+ crate::broadcast(|_| {});
+
+ // The spawn **must** have run by now.
+ assert_eq!(22, rx.try_recv().unwrap());
+}
diff --git a/src/job.rs b/src/job.rs
index deccebc..5664bb3 100644
--- a/src/job.rs
+++ b/src/job.rs
@@ -30,7 +30,6 @@ pub(super) trait Job {
/// Internally, we store the job's data in a `*const ()` pointer. The
/// true type is something like `*const StackJob<...>`, but we hide
/// it. We also carry the "execute fn" from the `Job` trait.
-#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(super) struct JobRef {
pointer: *const (),
execute_fn: unsafe fn(*const ()),
@@ -53,6 +52,13 @@ impl JobRef {
}
}
+ /// Returns an opaque handle that can be saved and compared,
+ /// without making `JobRef` itself `Copy + Eq`.
+ #[inline]
+ pub(super) fn id(&self) -> impl Eq {
+ (self.pointer, self.execute_fn)
+ }
+
#[inline]
pub(super) unsafe fn execute(self) {
(self.execute_fn)(self.pointer)
diff --git a/src/join/mod.rs b/src/join/mod.rs
index d72c7e6..5ab9f6b 100644
--- a/src/join/mod.rs
+++ b/src/join/mod.rs
@@ -135,6 +135,7 @@ where
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
+ let job_b_id = job_b_ref.id();
worker_thread.push(job_b_ref);
// Execute task a; hopefully b gets stolen in the meantime.
@@ -151,7 +152,7 @@ where
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.take_local_job() {
- if job == job_b_ref {
+ if job_b_id == job.id() {
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
diff --git a/src/join/test.rs b/src/join/test.rs
index e7f287f..b303dbc 100644
--- a/src/join/test.rs
+++ b/src/join/test.rs
@@ -47,6 +47,7 @@ fn sort() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
@@ -77,6 +78,7 @@ fn panic_propagate_both() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
@@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
@@ -94,6 +97,7 @@ fn join_context_both() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -104,6 +108,7 @@ fn join_context_neither() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;
@@ -127,6 +132,7 @@ fn join_context_second() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;
diff --git a/src/lib.rs b/src/lib.rs
index b31a2d7..c9694ee 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -26,7 +26,24 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
-//! ## Restricting multiple versions
+//! # Global fallback when threading is unsupported
+//!
+//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
+//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
+//! targets are notable examples of this. Rather than panicking on the unsupported error when
+//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
+//!
+//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
+//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
+//! there is no other thread to share the work. However, since the pool is not running independent
+//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
+//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
+//! anything like thread preemption or `async` task switching, but `yield_now` or `yield_local`
+//! can also volunteer execution time.
+//!
+//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
+//!
+//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
@@ -85,6 +102,7 @@ pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
+pub use self::thread_pool::{yield_local, yield_now, Yield};
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
@@ -707,6 +725,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}
+
+ fn is_unsupported(&self) -> bool {
+ matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
+ }
}
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
diff --git a/src/registry.rs b/src/registry.rs
index 24c0855..5d56ac9 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -6,6 +6,7 @@ use crate::sleep::Sleep;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
+ Yield,
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::cell::Cell;
@@ -50,7 +51,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
- unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
+ unsafe { main_loop(self) }
}
}
@@ -164,7 +165,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
- set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
+ set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -198,6 +199,46 @@ where
result
}
+fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
+ let result = Registry::new(ThreadPoolBuilder::new());
+
+ // If we're running in an environment that doesn't support threads at all, we can fall back to
+ // using the current thread alone. This is crude, and probably won't work for non-blocking
+ // calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
+ //
+ // Notably, this allows current WebAssembly targets to work even though their threading support
+ // is stubbed out, and we won't have to change anything if they do add real threading.
+ let unsupported = matches!(&result, Err(e) if e.is_unsupported());
+ if unsupported && WorkerThread::current().is_null() {
+ let builder = ThreadPoolBuilder::new()
+ .num_threads(1)
+ .spawn_handler(|thread| {
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
+ let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+
+ // let registry know we are ready to do work
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+
+ Ok(())
+ });
+
+ let fallback_result = Registry::new(builder);
+ if fallback_result.is_ok() {
+ return fallback_result;
+ }
+ }
+
+ result
+}
+
struct Terminator<'a>(&'a Arc<Registry>);
impl<'a> Drop for Terminator<'a> {
@@ -376,7 +417,7 @@ impl Registry {
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
- self.inject(&[job_ref]);
+ self.inject(job_ref);
}
}
}
@@ -384,10 +425,8 @@ impl Registry {
/// Push a job into the "external jobs" queue; it will be taken by
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
- pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
- self.log(|| JobsInjected {
- count: injected_jobs.len(),
- });
+ pub(super) fn inject(&self, injected_job: JobRef) {
+ self.log(|| JobsInjected { count: 1 });
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
@@ -402,12 +441,8 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
- for &job_ref in injected_jobs {
- self.injected_jobs.push(job_ref);
- }
-
- self.sleep
- .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
+ self.injected_jobs.push(injected_job);
+ self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
@@ -507,7 +542,7 @@ impl Registry {
},
LatchRef::new(l),
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
// flush accumulated logs as we exit the thread
@@ -535,7 +570,7 @@ impl Registry {
},
latch,
);
- self.inject(&[job.as_job_ref()]);
+ self.inject(job.as_job_ref());
current_thread.wait_until(&job.latch);
job.into_result()
}
@@ -652,7 +687,20 @@ pub(super) struct WorkerThread {
// worker is fully unwound. Using an unsafe pointer avoids the need
// for a RefCell<T> etc.
thread_local! {
- static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
+ static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null()) };
+}
+
+impl From<ThreadBuilder> for WorkerThread {
+ fn from(thread: ThreadBuilder) -> Self {
+ Self {
+ worker: thread.worker,
+ stealer: thread.stealer,
+ fifo: JobFifo::new(),
+ index: thread.index,
+ rng: XorShift64Star::new(),
+ registry: thread.registry,
+ }
+ }
}
impl Drop for WorkerThread {
@@ -725,7 +773,7 @@ impl WorkerThread {
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
- pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
+ pub(super) fn take_local_job(&self) -> Option<JobRef> {
let popped_job = self.worker.pop();
if popped_job.is_some() {
@@ -767,16 +815,7 @@ impl WorkerThread {
let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
while !latch.probe() {
- // Try to find some work to do. We give preference first
- // to things in our local deque, then in other workers
- // deques, and finally to injected jobs from the
- // outside. The idea is to finish what we started before
- // we take on something new.
- if let Some(job) = self
- .take_local_job()
- .or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
- {
+ if let Some(job) = self.find_work() {
self.registry.sleep.work_found(idle_state);
self.execute(job);
idle_state = self.registry.sleep.start_looking(self.index, latch);
@@ -799,6 +838,37 @@ impl WorkerThread {
mem::forget(abort_guard); // successful execution, do not abort
}
+ fn find_work(&self) -> Option<JobRef> {
+ // Try to find some work to do. We give preference first
+ // to things in our local deque, then in other workers
+ // deques, and finally to injected jobs from the
+ // outside. The idea is to finish what we started before
+ // we take on something new.
+ self.take_local_job()
+ .or_else(|| self.steal())
+ .or_else(|| self.registry.pop_injected_job(self.index))
+ }
+
+ pub(super) fn yield_now(&self) -> Yield {
+ match self.find_work() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
+ pub(super) fn yield_local(&self) -> Yield {
+ match self.take_local_job() {
+ Some(job) => unsafe {
+ self.execute(job);
+ Yield::Executed
+ },
+ None => Yield::Idle,
+ }
+ }
+
#[inline]
pub(super) unsafe fn execute(&self, job: JobRef) {
job.execute();
@@ -808,7 +878,7 @@ impl WorkerThread {
///
/// This should only be done as a last resort, when there is no
/// local work to do.
- unsafe fn steal(&self) -> Option<JobRef> {
+ fn steal(&self) -> Option<JobRef> {
// we only steal when we don't have any work to do locally
debug_assert!(self.local_deque_is_empty());
@@ -851,22 +921,11 @@ impl WorkerThread {
/// ////////////////////////////////////////////////////////////////////////
-unsafe fn main_loop(
- worker: Worker<JobRef>,
- stealer: Stealer<JobRef>,
- registry: Arc<Registry>,
- index: usize,
-) {
- let worker_thread = &WorkerThread {
- worker,
- stealer,
- fifo: JobFifo::new(),
- index,
- rng: XorShift64Star::new(),
- registry,
- };
+unsafe fn main_loop(thread: ThreadBuilder) {
+ let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
+ let index = worker_thread.index;
// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
@@ -924,7 +983,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
- global_registry().in_worker_cold(op)
+ global_registry().in_worker(op)
}
}
}
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
index b014cf0..f460dd7 100644
--- a/src/scope/mod.rs
+++ b/src/scope/mod.rs
@@ -615,7 +615,7 @@ impl<'scope> ScopeFifo<'scope> {
// SAFETY: this job will execute before the scope ends.
unsafe { worker.push(fifo.push(job_ref)) };
}
- None => self.base.registry.inject(&[job_ref]),
+ None => self.base.registry.inject(job_ref),
}
}
diff --git a/src/scope/test.rs b/src/scope/test.rs
index 00dd18c..ad8c4af 100644
--- a/src/scope/test.rs
+++ b/src/scope/test.rs
@@ -148,6 +148,7 @@ fn update_tree() {
/// linearly with N. We test this by some unsafe hackery and
/// permitting an approx 10% change with a 10x input change.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn linear_stack_growth() {
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = builder.build().unwrap();
@@ -213,6 +214,7 @@ fn panic_propagate_nested_scope_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_1() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -227,6 +229,7 @@ fn panic_propagate_still_execute_1() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_2() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -241,6 +244,7 @@ fn panic_propagate_still_execute_2() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_3() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -255,6 +259,7 @@ fn panic_propagate_still_execute_3() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_propagate_still_execute_4() {
let mut x = false;
match unwind::halt_unwinding(|| {
@@ -292,6 +297,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_order!(scope => spawn);
@@ -300,6 +306,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_order!(scope_fifo => spawn_fifo);
@@ -334,6 +341,7 @@ macro_rules! test_nested_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_order() {
// In the absence of stealing, `scope()` runs its `spawn()` jobs in LIFO order.
let vec = test_nested_order!(scope => spawn, scope => spawn);
@@ -342,6 +350,7 @@ fn nested_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_order() {
// In the absence of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order.
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
@@ -350,6 +359,7 @@ fn nested_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo);
@@ -361,6 +371,7 @@ fn nested_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn);
@@ -403,6 +414,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -412,6 +424,7 @@ fn mixed_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo);
let expected = vec![-1, 0, -2, 1, -3, 2, 3];
@@ -419,6 +432,7 @@ fn mixed_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
// NB: the end of the inner scope makes us execute some of the outer scope
// before they've all been spawned, so they're not perfectly LIFO.
@@ -428,6 +442,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn);
let expected = vec![-3, 0, -2, 1, -1, 2, 3];
@@ -553,6 +568,7 @@ fn scope_spawn_broadcast_nested() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_barrier() {
let barrier = Barrier::new(8);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -565,6 +581,7 @@ fn scope_spawn_broadcast_barrier() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
@@ -583,6 +600,7 @@ fn scope_spawn_broadcast_panic_one() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_spawn_broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
diff --git a/src/spawn/mod.rs b/src/spawn/mod.rs
index ae1f211..1aa9edb 100644
--- a/src/spawn/mod.rs
+++ b/src/spawn/mod.rs
@@ -154,7 +154,7 @@ where
// in a locally-FIFO order. Otherwise, just use the pool's global injector.
match registry.current_thread() {
Some(worker) => worker.push_fifo(job_ref),
- None => registry.inject(&[job_ref]),
+ None => registry.inject(job_ref),
}
mem::forget(abort_guard);
}
diff --git a/src/spawn/test.rs b/src/spawn/test.rs
index 761fafc..b7a0535 100644
--- a/src/spawn/test.rs
+++ b/src/spawn/test.rs
@@ -7,6 +7,7 @@ use super::{spawn, spawn_fifo};
use crate::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_in_worker() {
let (tx, rx) = channel();
scope(move |_| {
@@ -16,6 +17,7 @@ fn spawn_then_join_in_worker() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_then_join_outside_worker() {
let (tx, rx) = channel();
spawn(move || tx.send(22).unwrap());
@@ -23,6 +25,7 @@ fn spawn_then_join_outside_worker() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_fwd() {
let (tx, rx) = channel();
@@ -54,6 +57,7 @@ fn panic_fwd() {
/// still active asynchronous tasks. We expect the thread-pool to stay
/// alive and executing until those threads are complete.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn termination_while_things_are_executing() {
let (tx0, rx0) = channel();
let (tx1, rx1) = channel();
@@ -80,6 +84,7 @@ fn termination_while_things_are_executing() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_spawn() {
let (tx, rx) = channel();
@@ -107,6 +112,7 @@ fn custom_panic_handler_and_spawn() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn custom_panic_handler_and_nested_spawn() {
let (tx, rx) = channel();
@@ -165,6 +171,7 @@ macro_rules! test_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_order() {
// In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order.
let vec = test_order!(spawn, spawn);
@@ -173,6 +180,7 @@ fn lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_order() {
// In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order.
let vec = test_order!(spawn_fifo, spawn_fifo);
@@ -181,6 +189,7 @@ fn fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn lifo_fifo_order() {
// LIFO on the outside, FIFO on the inside
let vec = test_order!(spawn, spawn_fifo);
@@ -192,6 +201,7 @@ fn lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn fifo_lifo_order() {
// FIFO on the outside, LIFO on the inside
let vec = test_order!(spawn_fifo, spawn);
@@ -229,6 +239,7 @@ macro_rules! test_mixed_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_lifo_fifo_order() {
let vec = test_mixed_order!(spawn, spawn_fifo);
let expected = vec![3, -1, 2, -2, 1, -3, 0];
@@ -236,6 +247,7 @@ fn mixed_lifo_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mixed_fifo_lifo_order() {
let vec = test_mixed_order!(spawn_fifo, spawn);
let expected = vec![0, -3, 1, -2, 2, -1, 3];
diff --git a/src/test.rs b/src/test.rs
index 46d63a7..25b8487 100644
--- a/src/test.rs
+++ b/src/test.rs
@@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn worker_thread_index() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -14,6 +15,7 @@ fn worker_thread_index() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn start_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -40,6 +42,7 @@ fn start_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn exit_callback_called() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -69,6 +72,7 @@ fn exit_callback_called() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn handler_panics_handled_correctly() {
let n_threads = 16;
let n_called = Arc::new(AtomicUsize::new(0));
@@ -119,6 +123,7 @@ fn handler_panics_handled_correctly() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_config_build() {
let pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -134,6 +139,7 @@ fn check_error_send_sync() {
#[allow(deprecated)]
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn configuration() {
let start_handler = move |_| {};
let exit_handler = move |_| {};
@@ -154,6 +160,7 @@ fn configuration() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn default_pool() {
ThreadPoolBuilder::default().build().unwrap();
}
@@ -162,6 +169,7 @@ fn default_pool() {
/// the pool is done with them, allowing them to be used with rayon again
/// later. e.g. WebAssembly want to have their own pool of available threads.
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn cleared_current_thread() -> Result<(), ThreadPoolBuildError> {
let n_threads = 5;
let mut handles = vec![];
diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs
index 0fc06dd..c37826e 100644
--- a/src/thread_pool/mod.rs
+++ b/src/thread_pool/mod.rs
@@ -339,6 +339,30 @@ impl ThreadPool {
// We assert that `self.registry` has not terminated.
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
}
+
+ /// Cooperatively yields execution to Rayon.
+ ///
+ /// This is similar to the general [`yield_now()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_now(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_now())
+ }
+
+ /// Cooperatively yields execution to local Rayon work.
+ ///
+ /// This is similar to the general [`yield_local()`], but only if the current
+ /// thread is part of *this* thread pool.
+ ///
+ /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+ /// nothing was available, or `None` if the current thread is not part this pool.
+ pub fn yield_local(&self) -> Option<Yield> {
+ let curr = self.registry.current_thread()?;
+ Some(curr.yield_local())
+ }
}
impl Drop for ThreadPool {
@@ -400,3 +424,48 @@ pub fn current_thread_has_pending_tasks() -> Option<bool> {
Some(!curr.local_deque_is_empty())
}
}
+
+/// Cooperatively yields execution to Rayon.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in the pool, then executes it. Completion of
+/// that work might include nested work or further work stealing.
+///
+/// This is similar to [`std::thread::yield_now()`], but does not literally make
+/// that call. If you are implementing a polling loop, you may want to also
+/// yield to the OS scheduler yourself if no Rayon work was found.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_now() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_now())
+ }
+}
+
+/// Cooperatively yields execution to local Rayon work.
+///
+/// If the current thread is part of a rayon thread pool, this looks for a
+/// single unit of pending work in this thread's queue, then executes it.
+/// Completion of that work might include nested work or further work stealing.
+///
+/// This is similar to [`yield_now()`], but does not steal from other threads.
+///
+/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
+/// nothing was available, or `None` if this thread is not part of any pool at all.
+pub fn yield_local() -> Option<Yield> {
+ unsafe {
+ let thread = WorkerThread::current().as_ref()?;
+ Some(thread.yield_local())
+ }
+}
+
+/// Result of [`yield_now()`] or [`yield_local()`].
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Yield {
+ /// Work was found and executed.
+ Executed,
+ /// No available work was found.
+ Idle,
+}
diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs
index ac750a6..6143e57 100644
--- a/src/thread_pool/test.rs
+++ b/src/thread_pool/test.rs
@@ -16,6 +16,7 @@ fn panic_propagate() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
@@ -43,6 +44,7 @@ fn join_a_lot(n: usize) {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
@@ -89,6 +91,7 @@ fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
// Note: we first tried to force failure with a `usize::MAX` stack, but
// macOS and Windows weren't fazed, or at least didn't fail the way we want.
@@ -115,6 +118,7 @@ fn failed_thread_stack() {
}
#[test]
+#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
@@ -139,6 +143,7 @@ fn panic_thread_name() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -147,6 +152,7 @@ fn self_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
@@ -166,6 +172,7 @@ fn mutual_install() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
@@ -194,6 +201,7 @@ fn mutual_install_sleepy() {
#[test]
#[allow(deprecated)]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
@@ -219,6 +227,7 @@ macro_rules! test_scope_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -226,6 +235,7 @@ fn scope_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -250,6 +260,7 @@ macro_rules! test_spawn_order {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
@@ -257,6 +268,7 @@ fn spawn_lifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
@@ -264,6 +276,7 @@ fn spawn_fifo_order() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
// Create matching scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
@@ -300,6 +313,7 @@ fn nested_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
// Create matching fifo scopes for every thread pool.
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
@@ -336,6 +350,7 @@ fn nested_fifo_scopes() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -351,6 +366,7 @@ fn in_place_scope_no_deadlock() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
@@ -364,3 +380,39 @@ fn in_place_scope_fifo_no_deadlock() {
rx_ref.recv().unwrap();
});
}
+
+#[test]
+fn yield_now_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_now();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
+
+#[test]
+fn yield_local_to_spawn() {
+ let (tx, rx) = crossbeam_channel::bounded(1);
+
+ // Queue a regular spawn.
+ crate::spawn(move || tx.send(22).unwrap());
+
+ // The single-threaded fallback mode (for wasm etc.) won't
+ // get a chance to run the spawn if we never yield to it.
+ crate::registry::in_worker(move |_, _| {
+ crate::yield_local();
+ });
+
+ // The spawn **must** have started by now, but we still might have to wait
+ // for it to finish if a different thread stole it first.
+ assert_eq!(22, rx.recv().unwrap());
+}
diff --git a/tests/double_init_fail.rs b/tests/double_init_fail.rs
index b3ddbeb..1591530 100644
--- a/tests/double_init_fail.rs
+++ b/tests/double_init_fail.rs
@@ -2,6 +2,7 @@ use rayon_core::ThreadPoolBuilder;
use std::error::Error;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn double_init_fail() {
let result1 = ThreadPoolBuilder::new().build_global();
assert!(result1.is_ok());
diff --git a/tests/init_zero_threads.rs b/tests/init_zero_threads.rs
index ebd73c5..3c1ad25 100644
--- a/tests/init_zero_threads.rs
+++ b/tests/init_zero_threads.rs
@@ -1,6 +1,7 @@
use rayon_core::ThreadPoolBuilder;
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn init_zero_threads() {
ThreadPoolBuilder::new()
.num_threads(0)
diff --git a/tests/scoped_threadpool.rs b/tests/scoped_threadpool.rs
index db3d0b8..534e8bb 100644
--- a/tests/scoped_threadpool.rs
+++ b/tests/scoped_threadpool.rs
@@ -7,6 +7,7 @@ struct Local(i32);
scoped_tls::scoped_thread_local!(static LOCAL: Local);
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn missing_scoped_tls() {
LOCAL.set(&Local(42), || {
let pool = ThreadPoolBuilder::new()
@@ -21,6 +22,7 @@ fn missing_scoped_tls() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
@@ -63,6 +65,7 @@ fn spawn_scoped_tls_threadpool() {
}
#[test]
+#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn build_scoped_tls_threadpool() {
LOCAL.set(&Local(42), || {
LOCAL.with(|x| {
diff --git a/tests/stack_overflow_crash.rs b/tests/stack_overflow_crash.rs
index e87e151..7dcde43 100644
--- a/tests/stack_overflow_crash.rs
+++ b/tests/stack_overflow_crash.rs
@@ -40,10 +40,12 @@ fn overflow_code() -> Option<i32> {
}
#[test]
+#[cfg_attr(not(any(unix, windows)), ignore)]
fn stack_overflow_crash() {
// First check that the recursive call actually causes a stack overflow,
// and does not get optimized away.
let status = run_ignored("run_with_small_stack");
+ assert!(!status.success());
#[cfg(any(unix, windows))]
assert_eq!(status.code(), overflow_code());
#[cfg(target_os = "linux")]