aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:23:46 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-10 07:23:46 +0000
commit9589d93b05df0090de34026ddb1107c9832d66f7 (patch)
tree00997afec5942aa12d428a22e8e59c0bf76ec48e
parentc8c83ceeb6360a1d3d21d7e1bf53fd64699128bd (diff)
parent30628deab525306a799417fc9c0a96a3764428db (diff)
downloadrayon-core-android13-mainline-art-release.tar.gz
Change-Id: I68b928ce5470f67e4f956dec5d1232bf27d03dab
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp17
-rw-r--r--Cargo.toml6
-rw-r--r--Cargo.toml.orig6
-rw-r--r--METADATA10
-rw-r--r--TEST_MAPPING17
-rw-r--r--cargo2android.json4
-rw-r--r--src/latch.rs42
-rw-r--r--src/lib.rs32
-rw-r--r--src/registry.rs18
-rw-r--r--src/scope/mod.rs255
-rw-r--r--src/scope/test.rs4
-rw-r--r--src/thread_pool/mod.rs25
-rw-r--r--src/thread_pool/test.rs30
-rw-r--r--src/util.rs10
15 files changed, 364 insertions, 114 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index c28c857..c8aa4e4 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "dc13cb7875ad43c7d1ea8b1e504b09c031f7ed5a"
+ "sha1": "ebcb09b1dc53211c6b5abdf4dc5b40e4bcd0a965"
}
}
diff --git a/Android.bp b/Android.bp
index 6e62871..65b0527 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,4 @@
-// This file is generated by cargo2android.py --run --device --dependencies.
+// This file is generated by cargo2android.py --config cargo2android.json.
// Do not modify this file as changes will be overridden on upgrade.
package {
@@ -41,6 +41,8 @@ rust_library {
name: "librayon_core",
host_supported: true,
crate_name: "rayon_core",
+ cargo_env_compat: true,
+ cargo_pkg_version: "1.9.1",
srcs: ["src/lib.rs"],
edition: "2018",
rustlibs: [
@@ -51,16 +53,3 @@ rust_library {
"libnum_cpus",
],
}
-
-// dependent_library ["feature_list"]
-// autocfg-1.0.1
-// cfg-if-1.0.0
-// crossbeam-channel-0.5.0 "crossbeam-utils,default,std"
-// crossbeam-deque-0.8.0 "crossbeam-epoch,crossbeam-utils,default,std"
-// crossbeam-epoch-0.9.3 "alloc,lazy_static,std"
-// crossbeam-utils-0.8.3 "default,lazy_static,std"
-// lazy_static-1.4.0
-// libc-0.2.88 "default,std"
-// memoffset-0.6.1 "default"
-// num_cpus-1.13.0
-// scopeguard-1.1.0
diff --git a/Cargo.toml b/Cargo.toml
index 1f825b6..8b110fd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
[package]
edition = "2018"
name = "rayon-core"
-version = "1.9.0"
+version = "1.9.1"
authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"]
build = "build.rs"
links = "rayon-core"
@@ -64,10 +64,10 @@ version = "1"
[dependencies.num_cpus]
version = "1.2"
[dev-dependencies.rand]
-version = "0.7"
+version = "0.8"
[dev-dependencies.rand_xorshift]
-version = "0.2"
+version = "0.3"
[dev-dependencies.scoped-tls]
version = "1.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index b2082e4..d31ed22 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,6 +1,6 @@
[package]
name = "rayon-core"
-version = "1.9.0" # reminder to update html_root_url attribute
+version = "1.9.1" # reminder to update html_root_url attribute
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
@@ -23,8 +23,8 @@ crossbeam-deque = "0.8.0"
crossbeam-utils = "0.8.0"
[dev-dependencies]
-rand = "0.7"
-rand_xorshift = "0.2"
+rand = "0.8"
+rand_xorshift = "0.3"
scoped-tls = "1.0"
[target.'cfg(unix)'.dev-dependencies]
diff --git a/METADATA b/METADATA
index caa3678..7814725 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/rayon-core/rayon-core-1.9.0.crate"
+ value: "https://static.crates.io/crates/rayon-core/rayon-core-1.9.1.crate"
}
- version: "1.9.0"
+ version: "1.9.1"
license_type: NOTICE
last_upgrade_date {
- year: 2020
- month: 12
- day: 21
+ year: 2021
+ month: 5
+ day: 19
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..3cbd48d
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,17 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "imports": [
+ {
+ "path": "external/rust/crates/base64"
+ },
+ {
+ "path": "external/rust/crates/tinytemplate"
+ },
+ {
+ "path": "external/rust/crates/tinyvec"
+ },
+ {
+ "path": "external/rust/crates/unicode-xid"
+ }
+ ]
+}
diff --git a/cargo2android.json b/cargo2android.json
new file mode 100644
index 0000000..bf78496
--- /dev/null
+++ b/cargo2android.json
@@ -0,0 +1,4 @@
+{
+ "device": true,
+ "run": true
+} \ No newline at end of file
diff --git a/src/latch.rs b/src/latch.rs
index 0965bb9..1d573b7 100644
--- a/src/latch.rs
+++ b/src/latch.rs
@@ -218,6 +218,7 @@ impl<'r> Latch for SpinLatch<'r> {
/// A Latch starts as false and eventually becomes true. You can block
/// until it becomes true.
+#[derive(Debug)]
pub(super) struct LockLatch {
m: Mutex<bool>,
v: Condvar,
@@ -297,11 +298,9 @@ impl CountLatch {
/// Decrements the latch counter by one. If this is the final
/// count, then the latch is **set**, and calls to `probe()` will
- /// return true. Returns whether the latch was set. This is an
- /// internal operation, as it does not tickle, and to fail to
- /// tickle would lead to deadlock.
+ /// return true. Returns whether the latch was set.
#[inline]
- fn set(&self) -> bool {
+ pub(super) fn set(&self) -> bool {
if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
self.core_latch.set();
true
@@ -328,6 +327,41 @@ impl AsCoreLatch for CountLatch {
}
}
+#[derive(Debug)]
+pub(super) struct CountLockLatch {
+ lock_latch: LockLatch,
+ counter: AtomicUsize,
+}
+
+impl CountLockLatch {
+ #[inline]
+ pub(super) fn new() -> CountLockLatch {
+ CountLockLatch {
+ lock_latch: LockLatch::new(),
+ counter: AtomicUsize::new(1),
+ }
+ }
+
+ #[inline]
+ pub(super) fn increment(&self) {
+ let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
+ debug_assert!(old_counter != 0);
+ }
+
+ pub(super) fn wait(&self) {
+ self.lock_latch.wait();
+ }
+}
+
+impl Latch for CountLockLatch {
+ #[inline]
+ fn set(&self) {
+ if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
+ self.lock_latch.set();
+ }
+ }
+}
+
impl<'a, L> Latch for &'a L
where
L: Latch,
diff --git a/src/lib.rs b/src/lib.rs
index f514bb6..2df24b2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,5 +1,30 @@
+//! Rayon-core houses the core stable APIs of Rayon.
//!
-//! [Under construction](https://github.com/rayon-rs/rayon/issues/231)
+//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there.
+//!
+//! [`join`] is used to take two closures and potentially run them in parallel.
+//! - It will run in parallel if task B gets stolen before task A can finish.
+//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B.
+//!
+//! [`scope`] creates a scope in which you can run any number of parallel tasks.
+//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed.
+//! The scope will exist until all tasks spawned within the scope have been completed.
+//!
+//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function.
+//!
+//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one.
+//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque,
+//! where it becomes available for work stealing from other threads in the local threadpool.
+//!
+//! [`join`]: fn.join.html
+//! [`scope`]: fn.scope.html
+//! [`scope()`]: fn.scope.html
+//! [`spawn`]: fn.spawn.html
+//! [`ThreadPool`]: struct.threadpool.html
+//! [`install()`]: struct.ThreadPool.html#method.install
+//! [`spawn()`]: struct.ThreadPool.html#method.spawn
+//! [`join()`]: struct.ThreadPool.html#method.join
+//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
//! ## Restricting multiple versions
//!
@@ -47,15 +72,14 @@ mod sleep;
mod spawn;
mod thread_pool;
mod unwind;
-mod util;
mod compile_fail;
mod test;
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
-pub use self::scope::{scope, Scope};
-pub use self::scope::{scope_fifo, ScopeFifo};
+pub use self::scope::{in_place_scope, scope, Scope};
+pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo};
pub use self::spawn::{spawn, spawn_fifo};
pub use self::thread_pool::current_thread_has_pending_tasks;
pub use self::thread_pool::current_thread_index;
diff --git a/src/registry.rs b/src/registry.rs
index 46ae10a..4156b90 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -4,7 +4,6 @@ use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
use crate::unwind;
-use crate::util::leak;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
};
@@ -159,15 +158,15 @@ pub(super) struct Registry {
/// ////////////////////////////////////////////////////////////////////////
/// Initialization
-static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
+static mut THE_REGISTRY: Option<Arc<Registry>> = None;
static THE_REGISTRY_SET: Once = Once::new();
/// Starts the worker threads (if that has not already happened). If
/// initialization has not already occurred, use the default
/// configuration.
-fn global_registry() -> &'static Arc<Registry> {
+pub(super) fn global_registry() -> &'static Arc<Registry> {
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
- .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
+ .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
@@ -191,15 +190,12 @@ where
let mut result = Err(ThreadPoolBuildError::new(
ErrorKind::GlobalPoolAlreadyInitialized,
));
+
THE_REGISTRY_SET.call_once(|| {
- result = registry().map(|registry| {
- let registry = leak(registry);
- unsafe {
- THE_REGISTRY = Some(registry);
- }
- registry
- });
+ result = registry()
+ .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) })
});
+
result
}
diff --git a/src/scope/mod.rs b/src/scope/mod.rs
index a41d408..543aa26 100644
--- a/src/scope/mod.rs
+++ b/src/scope/mod.rs
@@ -1,12 +1,13 @@
//! Methods for custom fork-join scopes, created by the [`scope()`]
-//! function. These are a more flexible alternative to [`join()`].
+//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
//!
//! [`scope()`]: fn.scope.html
+//! [`in_place_scope()`]: fn.in_place_scope.html
//! [`join()`]: ../join/join.fn.html
use crate::job::{HeapJob, JobFifo};
-use crate::latch::CountLatch;
-use crate::registry::{in_worker, Registry, WorkerThread};
+use crate::latch::{CountLatch, CountLockLatch, Latch};
+use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
use crate::unwind;
use std::any::Any;
use std::fmt;
@@ -37,21 +38,37 @@ pub struct ScopeFifo<'scope> {
fifos: Vec<JobFifo>,
}
-struct ScopeBase<'scope> {
- /// thread where `scope()` was executed (note that individual jobs
- /// may be executing on different worker threads, though they
- /// should always be within the same pool of threads)
- owner_thread_index: usize,
+enum ScopeLatch {
+ /// A latch for scopes created on a rayon thread which will participate in work-
+ /// stealing while it waits for completion. This thread is not necessarily part
+ /// of the same registry as the scope itself!
+ Stealing {
+ latch: CountLatch,
+ /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
+ /// with registry B, when a job completes in a thread of registry B, we may
+ /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A.
+ /// That means we need a reference to registry A (since at that point we will
+ /// only have a reference to registry B), so we stash it here.
+ registry: Arc<Registry>,
+ /// The index of the worker to wake in `registry`
+ worker_index: usize,
+ },
+
+ /// A latch for scopes created on a non-rayon thread which will block to wait.
+ Blocking { latch: CountLockLatch },
+}
- /// thread registry where `scope()` was executed.
+struct ScopeBase<'scope> {
+ /// thread registry where `scope()` was executed or where `in_place_scope()`
+ /// should spawn jobs.
registry: Arc<Registry>,
/// if some job panicked, the error is stored here; it will be
/// propagated to the one who created the scope
panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
- /// latch to set when the counter drops to zero (and hence this scope is complete)
- job_completed_latch: CountLatch,
+ /// latch to track job counts
+ job_completed_latch: ScopeLatch,
/// You can think of a scope as containing a list of closures to execute,
/// all of which outlive `'scope`. They're not actually required to be
@@ -289,8 +306,8 @@ where
R: Send,
{
in_worker(|owner_thread, _| {
- let scope = Scope::<'scope>::new(owner_thread);
- unsafe { scope.base.complete(owner_thread, || op(&scope)) }
+ let scope = Scope::<'scope>::new(Some(owner_thread), None);
+ scope.base.complete(Some(owner_thread), || op(&scope))
})
}
@@ -380,16 +397,89 @@ where
R: Send,
{
in_worker(|owner_thread, _| {
- let scope = ScopeFifo::<'scope>::new(owner_thread);
- unsafe { scope.base.complete(owner_thread, || op(&scope)) }
+ let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
+ scope.base.complete(Some(owner_thread), || op(&scope))
})
}
+/// Creates a "fork-join" scope `s` and invokes the closure with a
+/// reference to `s`. This closure can then spawn asynchronous tasks
+/// into `s`. Those tasks may run asynchronously with respect to the
+/// closure; they may themselves spawn additional tasks into `s`. When
+/// the closure returns, it will block until all tasks that have been
+/// spawned into `s` complete.
+///
+/// This is just like `scope()` except the closure runs on the same thread
+/// that calls `in_place_scope()`. Only work that it spawns runs in the
+/// thread pool.
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `in_place_scope()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn()`, it will
+/// execute, even if the spawning task should later panic. `in_place_scope()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R,
+{
+ do_in_place_scope(None, op)
+}
+
+pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
+where
+ OP: FnOnce(&Scope<'scope>) -> R,
+{
+ let thread = unsafe { WorkerThread::current().as_ref() };
+ let scope = Scope::<'scope>::new(thread, registry);
+ scope.base.complete(thread, || op(&scope))
+}
+
+/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
+/// closure with a reference to `s`. This closure can then spawn
+/// asynchronous tasks into `s`. Those tasks may run asynchronously with
+/// respect to the closure; they may themselves spawn additional tasks
+/// into `s`. When the closure returns, it will block until all tasks
+/// that have been spawned into `s` complete.
+///
+/// This is just like `scope_fifo()` except the closure runs on the same thread
+/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
+/// thread pool.
+///
+/// # Panics
+///
+/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
+/// any of the spawned jobs, that panic will be propagated and the
+/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
+/// non-deterministic which of their panic values will propagate.
+/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
+/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
+/// returns once all spawned jobs have completed, and any panics are
+/// propagated at that point.
+pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+{
+ do_in_place_scope_fifo(None, op)
+}
+
+pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
+where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+{
+ let thread = unsafe { WorkerThread::current().as_ref() };
+ let scope = ScopeFifo::<'scope>::new(thread, registry);
+ scope.base.complete(thread, || op(&scope))
+}
+
impl<'scope> Scope<'scope> {
- fn new(owner_thread: &WorkerThread) -> Self {
- Scope {
- base: ScopeBase::new(owner_thread),
- }
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let base = ScopeBase::new(owner, registry);
+ Scope { base }
}
/// Spawns a job into the fork-join scope `self`. This job will
@@ -457,18 +547,18 @@ impl<'scope> Scope<'scope> {
// Since `Scope` implements `Sync`, we can't be sure that we're still in a
// thread of this pool, so we can't just push to the local worker thread.
+ // Also, this might be an in-place scope.
self.base.registry.inject_or_push(job_ref);
}
}
}
impl<'scope> ScopeFifo<'scope> {
- fn new(owner_thread: &WorkerThread) -> Self {
- let num_threads = owner_thread.registry().num_threads();
- ScopeFifo {
- base: ScopeBase::new(owner_thread),
- fifos: (0..num_threads).map(|_| JobFifo::new()).collect(),
- }
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let base = ScopeBase::new(owner, registry);
+ let num_threads = base.registry.num_threads();
+ let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
+ ScopeFifo { base, fifos }
}
/// Spawns a job into the fork-join scope `self`. This job will
@@ -510,13 +600,17 @@ impl<'scope> ScopeFifo<'scope> {
}
impl<'scope> ScopeBase<'scope> {
- /// Creates the base of a new scope for the given worker thread
- fn new(owner_thread: &WorkerThread) -> Self {
+ /// Creates the base of a new scope for the given registry
+ fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
+ let registry = registry.unwrap_or_else(|| match owner {
+ Some(owner) => owner.registry(),
+ None => global_registry(),
+ });
+
ScopeBase {
- owner_thread_index: owner_thread.index(),
- registry: owner_thread.registry().clone(),
+ registry: Arc::clone(registry),
panic: AtomicPtr::new(ptr::null_mut()),
- job_completed_latch: CountLatch::new(),
+ job_completed_latch: ScopeLatch::new(owner),
marker: PhantomData,
}
}
@@ -527,22 +621,19 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- ///
- /// Unsafe because it must be executed on a worker thread.
- unsafe fn complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R
+ fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
where
FUNC: FnOnce() -> R,
{
let result = self.execute_job_closure(func);
- self.steal_till_jobs_complete(owner_thread);
+ self.job_completed_latch.wait(owner);
+ self.maybe_propagate_panic();
result.unwrap() // only None if `op` panicked, and that would have been propagated
}
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- ///
- /// Unsafe because it must be executed on a worker thread.
- unsafe fn execute_job<FUNC>(&self, func: FUNC)
+ fn execute_job<FUNC>(&self, func: FUNC)
where
FUNC: FnOnce(),
{
@@ -552,25 +643,24 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- ///
- /// Unsafe because this must be executed on a worker thread.
- unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_ok();
+ self.job_completed_latch.set();
Some(r)
}
Err(err) => {
self.job_panicked(err);
+ self.job_completed_latch.set();
None
}
}
}
- unsafe fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
+ fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
let nil = ptr::null_mut();
let mut err = Box::new(err); // box up the fat ptr
@@ -581,36 +671,73 @@ impl<'scope> ScopeBase<'scope> {
{
mem::forget(err); // ownership now transferred into self.panic
}
-
- self.job_completed_latch
- .set_and_tickle_one(&self.registry, self.owner_thread_index);
}
- unsafe fn job_completed_ok(&self) {
- self.job_completed_latch
- .set_and_tickle_one(&self.registry, self.owner_thread_index);
- }
-
- unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) {
- // wait for job counter to reach 0:
- owner_thread.wait_until(&self.job_completed_latch);
-
+ fn maybe_propagate_panic(&self) {
// propagate panic, if any occurred; at this point, all
// outstanding jobs have completed, so we can use a relaxed
// ordering:
let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
if !panic.is_null() {
- let value: Box<Box<dyn Any + Send + 'static>> = mem::transmute(panic);
+ let value = unsafe { Box::from_raw(panic) };
unwind::resume_unwinding(*value);
}
}
}
+impl ScopeLatch {
+ fn new(owner: Option<&WorkerThread>) -> Self {
+ match owner {
+ Some(owner) => ScopeLatch::Stealing {
+ latch: CountLatch::new(),
+ registry: Arc::clone(owner.registry()),
+ worker_index: owner.index(),
+ },
+ None => ScopeLatch::Blocking {
+ latch: CountLockLatch::new(),
+ },
+ }
+ }
+
+ fn increment(&self) {
+ match self {
+ ScopeLatch::Stealing { latch, .. } => latch.increment(),
+ ScopeLatch::Blocking { latch } => latch.increment(),
+ }
+ }
+
+ fn set(&self) {
+ match self {
+ ScopeLatch::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => latch.set_and_tickle_one(registry, *worker_index),
+ ScopeLatch::Blocking { latch } => latch.set(),
+ }
+ }
+
+ fn wait(&self, owner: Option<&WorkerThread>) {
+ match self {
+ ScopeLatch::Stealing {
+ latch,
+ registry,
+ worker_index,
+ } => unsafe {
+ let owner = owner.expect("owner thread");
+ debug_assert_eq!(registry.id(), owner.registry().id());
+ debug_assert_eq!(*worker_index, owner.index());
+ owner.wait_until(latch);
+ },
+ ScopeLatch::Blocking { latch } => latch.wait(),
+ }
+ }
+}
+
impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scope")
.field("pool_id", &self.base.registry.id())
- .field("owner_thread_index", &self.base.owner_thread_index)
.field("panic", &self.base.panic)
.field("job_completed_latch", &self.base.job_completed_latch)
.finish()
@@ -622,9 +749,23 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> {
fmt.debug_struct("ScopeFifo")
.field("num_fifos", &self.fifos.len())
.field("pool_id", &self.base.registry.id())
- .field("owner_thread_index", &self.base.owner_thread_index)
.field("panic", &self.base.panic)
.field("job_completed_latch", &self.base.job_completed_latch)
.finish()
}
}
+
+impl fmt::Debug for ScopeLatch {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ ScopeLatch::Stealing { latch, .. } => fmt
+ .debug_tuple("ScopeLatch::Stealing")
+ .field(latch)
+ .finish(),
+ ScopeLatch::Blocking { latch } => fmt
+ .debug_tuple("ScopeLatch::Blocking")
+ .field(latch)
+ .finish(),
+ }
+ }
+}
diff --git a/src/scope/test.rs b/src/scope/test.rs
index 8cb82b6..471d78e 100644
--- a/src/scope/test.rs
+++ b/src/scope/test.rs
@@ -120,13 +120,13 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> {
let children = if depth == 0 {
vec![]
} else {
- (0..rng.gen_range(0, 4)) // somewhere between 0 and 3 children at each level
+ (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level
.map(|_| random_tree1(depth - 1, rng))
.collect()
};
Tree {
- value: rng.gen_range(0, 1_000_000),
+ value: rng.gen_range(0..1_000_000),
children,
}
}
diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs
index 2209f63..5edaedc 100644
--- a/src/thread_pool/mod.rs
+++ b/src/thread_pool/mod.rs
@@ -5,6 +5,7 @@
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
+use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
#[allow(deprecated)]
use crate::Configuration;
@@ -221,6 +222,30 @@ impl ThreadPool {
self.install(|| scope_fifo(op))
}
+ /// Creates a scope that spawns work into this thread-pool.
+ ///
+ /// See also: [the `in_place_scope()` function][in_place_scope].
+ ///
+ /// [in_place_scope]: fn.in_place_scope.html
+ pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&Scope<'scope>) -> R,
+ {
+ do_in_place_scope(Some(&self.registry), op)
+ }
+
+ /// Creates a scope that spawns work into this thread-pool in FIFO order.
+ ///
+ /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo].
+ ///
+ /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html
+ pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
+ where
+ OP: FnOnce(&ScopeFifo<'scope>) -> R,
+ {
+ do_in_place_scope_fifo(Some(&self.registry), op)
+ }
+
/// Spawns an asynchronous task in this thread-pool. This task will
/// run in the implicit, global scope, which means that it may outlast
/// the current stack frame -- therefore, it cannot capture any references
diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs
index 8d1c90c..1510e37 100644
--- a/src/thread_pool/test.rs
+++ b/src/thread_pool/test.rs
@@ -336,3 +336,33 @@ fn nested_fifo_scopes() {
});
assert_eq!(counter.into_inner(), pools.len());
}
+
+#[test]
+fn in_place_scope_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = &rx;
+ pool.in_place_scope(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}
+
+#[test]
+fn in_place_scope_fifo_no_deadlock() {
+ let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
+ let (tx, rx) = channel();
+ let rx_ref = &rx;
+ pool.in_place_scope_fifo(move |s| {
+ // With regular scopes this closure would never run because this scope op
+ // itself would block the only worker thread.
+ s.spawn_fifo(move |_| {
+ tx.send(()).unwrap();
+ });
+ rx_ref.recv().unwrap();
+ });
+}
diff --git a/src/util.rs b/src/util.rs
deleted file mode 100644
index 27d375a..0000000
--- a/src/util.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-use std::mem;
-
-pub(super) fn leak<T>(v: T) -> &'static T {
- unsafe {
- let b = Box::new(v);
- let p: *const T = &*b;
- mem::forget(b); // leak our reference, so that `b` is never freed
- &*p
- }
-}