diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:08:28 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:08:28 +0000 |
commit | adc1bb5f7a79f27ce2466bde9f555bbf0dc3f31f (patch) | |
tree | 00997afec5942aa12d428a22e8e59c0bf76ec48e | |
parent | c8c83ceeb6360a1d3d21d7e1bf53fd64699128bd (diff) | |
parent | 30628deab525306a799417fc9c0a96a3764428db (diff) | |
download | rayon-core-android13-mainline-tethering-release.tar.gz |
Snap for 8564071 from 30628deab525306a799417fc9c0a96a3764428db to mainline-tethering-releaseaml_tet_331910040aml_tet_331820050aml_tet_331711040aml_tet_331511160aml_tet_331511000aml_tet_331412030aml_tet_331312080aml_tet_331117000aml_tet_331012080aml_tet_330911010aml_tet_330812150android13-mainline-tethering-release
Change-Id: Ic9f546134d33bea0690f4532d4e7171b2d6a3d0e
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 17 | ||||
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | Cargo.toml.orig | 6 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | TEST_MAPPING | 17 | ||||
-rw-r--r-- | cargo2android.json | 4 | ||||
-rw-r--r-- | src/latch.rs | 42 | ||||
-rw-r--r-- | src/lib.rs | 32 | ||||
-rw-r--r-- | src/registry.rs | 18 | ||||
-rw-r--r-- | src/scope/mod.rs | 255 | ||||
-rw-r--r-- | src/scope/test.rs | 4 | ||||
-rw-r--r-- | src/thread_pool/mod.rs | 25 | ||||
-rw-r--r-- | src/thread_pool/test.rs | 30 | ||||
-rw-r--r-- | src/util.rs | 10 |
15 files changed, 364 insertions, 114 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index c28c857..c8aa4e4 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "dc13cb7875ad43c7d1ea8b1e504b09c031f7ed5a" + "sha1": "ebcb09b1dc53211c6b5abdf4dc5b40e4bcd0a965" } } @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --run --device --dependencies. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -41,6 +41,8 @@ rust_library { name: "librayon_core", host_supported: true, crate_name: "rayon_core", + cargo_env_compat: true, + cargo_pkg_version: "1.9.1", srcs: ["src/lib.rs"], edition: "2018", rustlibs: [ @@ -51,16 +53,3 @@ rust_library { "libnum_cpus", ], } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// cfg-if-1.0.0 -// crossbeam-channel-0.5.0 "crossbeam-utils,default,std" -// crossbeam-deque-0.8.0 "crossbeam-epoch,crossbeam-utils,default,std" -// crossbeam-epoch-0.9.3 "alloc,lazy_static,std" -// crossbeam-utils-0.8.3 "default,lazy_static,std" -// lazy_static-1.4.0 -// libc-0.2.88 "default,std" -// memoffset-0.6.1 "default" -// num_cpus-1.13.0 -// scopeguard-1.1.0 @@ -13,7 +13,7 @@ [package] edition = "2018" name = "rayon-core" -version = "1.9.0" +version = "1.9.1" authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"] build = "build.rs" links = "rayon-core" @@ -64,10 +64,10 @@ version = "1" [dependencies.num_cpus] version = "1.2" [dev-dependencies.rand] -version = "0.7" +version = "0.8" [dev-dependencies.rand_xorshift] -version = "0.2" +version = "0.3" [dev-dependencies.scoped-tls] version = "1.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index b2082e4..d31ed22 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "rayon-core" -version = "1.9.0" # reminder to update html_root_url attribute +version = "1.9.1" # reminder to update html_root_url attribute authors = ["Niko Matsakis <niko@alum.mit.edu>", "Josh Stone <cuviper@gmail.com>"] description = "Core APIs for Rayon" @@ -23,8 +23,8 @@ crossbeam-deque = "0.8.0" crossbeam-utils = "0.8.0" [dev-dependencies] -rand = "0.7" -rand_xorshift = "0.2" +rand = "0.8" +rand_xorshift = "0.3" scoped-tls = "1.0" [target.'cfg(unix)'.dev-dependencies] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/rayon-core/rayon-core-1.9.0.crate" + value: "https://static.crates.io/crates/rayon-core/rayon-core-1.9.1.crate" } - version: "1.9.0" + version: "1.9.1" license_type: NOTICE last_upgrade_date { - year: 2020 - month: 12 - day: 21 + year: 2021 + month: 5 + day: 19 } } diff --git a/TEST_MAPPING b/TEST_MAPPING new file mode 100644 index 0000000..3cbd48d --- /dev/null +++ b/TEST_MAPPING @@ -0,0 +1,17 @@ +// Generated by update_crate_tests.py for tests that depend on this crate. +{ + "imports": [ + { + "path": "external/rust/crates/base64" + }, + { + "path": "external/rust/crates/tinytemplate" + }, + { + "path": "external/rust/crates/tinyvec" + }, + { + "path": "external/rust/crates/unicode-xid" + } + ] +} diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..bf78496 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,4 @@ +{ + "device": true, + "run": true +}
\ No newline at end of file diff --git a/src/latch.rs b/src/latch.rs index 0965bb9..1d573b7 100644 --- a/src/latch.rs +++ b/src/latch.rs @@ -218,6 +218,7 @@ impl<'r> Latch for SpinLatch<'r> { /// A Latch starts as false and eventually becomes true. You can block /// until it becomes true. +#[derive(Debug)] pub(super) struct LockLatch { m: Mutex<bool>, v: Condvar, @@ -297,11 +298,9 @@ impl CountLatch { /// Decrements the latch counter by one. If this is the final /// count, then the latch is **set**, and calls to `probe()` will - /// return true. Returns whether the latch was set. This is an - /// internal operation, as it does not tickle, and to fail to - /// tickle would lead to deadlock. + /// return true. Returns whether the latch was set. #[inline] - fn set(&self) -> bool { + pub(super) fn set(&self) -> bool { if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { self.core_latch.set(); true @@ -328,6 +327,41 @@ impl AsCoreLatch for CountLatch { } } +#[derive(Debug)] +pub(super) struct CountLockLatch { + lock_latch: LockLatch, + counter: AtomicUsize, +} + +impl CountLockLatch { + #[inline] + pub(super) fn new() -> CountLockLatch { + CountLockLatch { + lock_latch: LockLatch::new(), + counter: AtomicUsize::new(1), + } + } + + #[inline] + pub(super) fn increment(&self) { + let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); + debug_assert!(old_counter != 0); + } + + pub(super) fn wait(&self) { + self.lock_latch.wait(); + } +} + +impl Latch for CountLockLatch { + #[inline] + fn set(&self) { + if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { + self.lock_latch.set(); + } + } +} + impl<'a, L> Latch for &'a L where L: Latch, @@ -1,5 +1,30 @@ +//! Rayon-core houses the core stable APIs of Rayon. //! -//! [Under construction](https://github.com/rayon-rs/rayon/issues/231) +//! These APIs have been mirrored in the Rayon crate and it is recommended to use these from there. +//! +//! [`join`] is used to take two closures and potentially run them in parallel. +//! - It will run in parallel if task B gets stolen before task A can finish. +//! - It will run sequentially if task A finishes before task B is stolen and can continue on task B. +//! +//! [`scope`] creates a scope in which you can run any number of parallel tasks. +//! These tasks can spawn nested tasks and scopes, but given the nature of work stealing, the order of execution can not be guaranteed. +//! The scope will exist until all tasks spawned within the scope have been completed. +//! +//! [`spawn`] add a task into the 'static' or 'global' scope, or a local scope created by the [`scope()`] function. +//! +//! [`ThreadPool`] can be used to create your own thread pools (using [`ThreadPoolBuilder`]) or to customize the global one. +//! Tasks spawned within the pool (using [`install()`], [`join()`], etc.) will be added to a deque, +//! where it becomes available for work stealing from other threads in the local threadpool. +//! +//! [`join`]: fn.join.html +//! [`scope`]: fn.scope.html +//! [`scope()`]: fn.scope.html +//! [`spawn`]: fn.spawn.html +//! [`ThreadPool`]: struct.threadpool.html +//! [`install()`]: struct.ThreadPool.html#method.install +//! [`spawn()`]: struct.ThreadPool.html#method.spawn +//! [`join()`]: struct.ThreadPool.html#method.join +//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html //! //! ## Restricting multiple versions //! @@ -47,15 +72,14 @@ mod sleep; mod spawn; mod thread_pool; mod unwind; -mod util; mod compile_fail; mod test; pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; -pub use self::scope::{scope, Scope}; -pub use self::scope::{scope_fifo, ScopeFifo}; +pub use self::scope::{in_place_scope, scope, Scope}; +pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; pub use self::spawn::{spawn, spawn_fifo}; pub use self::thread_pool::current_thread_has_pending_tasks; pub use self::thread_pool::current_thread_index; diff --git a/src/registry.rs b/src/registry.rs index 46ae10a..4156b90 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -4,7 +4,6 @@ use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; use crate::unwind; -use crate::util::leak; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, }; @@ -159,15 +158,15 @@ pub(super) struct Registry { /// //////////////////////////////////////////////////////////////////////// /// Initialization -static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None; +static mut THE_REGISTRY: Option<Arc<Registry>> = None; static THE_REGISTRY_SET: Once = Once::new(); /// Starts the worker threads (if that has not already happened). If /// initialization has not already occurred, use the default /// configuration. -fn global_registry() -> &'static Arc<Registry> { +pub(super) fn global_registry() -> &'static Arc<Registry> { set_global_registry(|| Registry::new(ThreadPoolBuilder::new())) - .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) }) + .or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) }) .expect("The global thread pool has not been initialized.") } @@ -191,15 +190,12 @@ where let mut result = Err(ThreadPoolBuildError::new( ErrorKind::GlobalPoolAlreadyInitialized, )); + THE_REGISTRY_SET.call_once(|| { - result = registry().map(|registry| { - let registry = leak(registry); - unsafe { - THE_REGISTRY = Some(registry); - } - registry - }); + result = registry() + .map(|registry: Arc<Registry>| unsafe { &*THE_REGISTRY.get_or_insert(registry) }) }); + result } diff --git a/src/scope/mod.rs b/src/scope/mod.rs index a41d408..543aa26 100644 --- a/src/scope/mod.rs +++ b/src/scope/mod.rs @@ -1,12 +1,13 @@ //! Methods for custom fork-join scopes, created by the [`scope()`] -//! function. These are a more flexible alternative to [`join()`]. +//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. //! //! [`scope()`]: fn.scope.html +//! [`in_place_scope()`]: fn.in_place_scope.html //! [`join()`]: ../join/join.fn.html use crate::job::{HeapJob, JobFifo}; -use crate::latch::CountLatch; -use crate::registry::{in_worker, Registry, WorkerThread}; +use crate::latch::{CountLatch, CountLockLatch, Latch}; +use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::unwind; use std::any::Any; use std::fmt; @@ -37,21 +38,37 @@ pub struct ScopeFifo<'scope> { fifos: Vec<JobFifo>, } -struct ScopeBase<'scope> { - /// thread where `scope()` was executed (note that individual jobs - /// may be executing on different worker threads, though they - /// should always be within the same pool of threads) - owner_thread_index: usize, +enum ScopeLatch { + /// A latch for scopes created on a rayon thread which will participate in work- + /// stealing while it waits for completion. This thread is not necessarily part + /// of the same registry as the scope itself! + Stealing { + latch: CountLatch, + /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool + /// with registry B, when a job completes in a thread of registry B, we may + /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. + /// That means we need a reference to registry A (since at that point we will + /// only have a reference to registry B), so we stash it here. + registry: Arc<Registry>, + /// The index of the worker to wake in `registry` + worker_index: usize, + }, + + /// A latch for scopes created on a non-rayon thread which will block to wait. + Blocking { latch: CountLockLatch }, +} - /// thread registry where `scope()` was executed. +struct ScopeBase<'scope> { + /// thread registry where `scope()` was executed or where `in_place_scope()` + /// should spawn jobs. registry: Arc<Registry>, /// if some job panicked, the error is stored here; it will be /// propagated to the one who created the scope panic: AtomicPtr<Box<dyn Any + Send + 'static>>, - /// latch to set when the counter drops to zero (and hence this scope is complete) - job_completed_latch: CountLatch, + /// latch to track job counts + job_completed_latch: ScopeLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be @@ -289,8 +306,8 @@ where R: Send, { in_worker(|owner_thread, _| { - let scope = Scope::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } + let scope = Scope::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) }) } @@ -380,16 +397,89 @@ where R: Send, { in_worker(|owner_thread, _| { - let scope = ScopeFifo::<'scope>::new(owner_thread); - unsafe { scope.base.complete(owner_thread, || op(&scope)) } + let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); + scope.base.complete(Some(owner_thread), || op(&scope)) }) } +/// Creates a "fork-join" scope `s` and invokes the closure with a +/// reference to `s`. This closure can then spawn asynchronous tasks +/// into `s`. Those tasks may run asynchronously with respect to the +/// closure; they may themselves spawn additional tasks into `s`. When +/// the closure returns, it will block until all tasks that have been +/// spawned into `s` complete. +/// +/// This is just like `scope()` except the closure runs on the same thread +/// that calls `in_place_scope()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + do_in_place_scope(None, op) +} + +pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&Scope<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = Scope::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + +/// Creates a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// This is just like `scope_fifo()` except the closure runs on the same thread +/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the +/// thread pool. +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in +/// any of the spawned jobs, that panic will be propagated and the +/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will +/// execute, even if the spawning task should later panic. `in_place_scope_fifo()` +/// returns once all spawned jobs have completed, and any panics are +/// propagated at that point. +pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + do_in_place_scope_fifo(None, op) +} + +pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R +where + OP: FnOnce(&ScopeFifo<'scope>) -> R, +{ + let thread = unsafe { WorkerThread::current().as_ref() }; + let scope = ScopeFifo::<'scope>::new(thread, registry); + scope.base.complete(thread, || op(&scope)) +} + impl<'scope> Scope<'scope> { - fn new(owner_thread: &WorkerThread) -> Self { - Scope { - base: ScopeBase::new(owner_thread), - } + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + Scope { base } } /// Spawns a job into the fork-join scope `self`. This job will @@ -457,18 +547,18 @@ impl<'scope> Scope<'scope> { // Since `Scope` implements `Sync`, we can't be sure that we're still in a // thread of this pool, so we can't just push to the local worker thread. + // Also, this might be an in-place scope. self.base.registry.inject_or_push(job_ref); } } } impl<'scope> ScopeFifo<'scope> { - fn new(owner_thread: &WorkerThread) -> Self { - let num_threads = owner_thread.registry().num_threads(); - ScopeFifo { - base: ScopeBase::new(owner_thread), - fifos: (0..num_threads).map(|_| JobFifo::new()).collect(), - } + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let base = ScopeBase::new(owner, registry); + let num_threads = base.registry.num_threads(); + let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); + ScopeFifo { base, fifos } } /// Spawns a job into the fork-join scope `self`. This job will @@ -510,13 +600,17 @@ impl<'scope> ScopeFifo<'scope> { } impl<'scope> ScopeBase<'scope> { - /// Creates the base of a new scope for the given worker thread - fn new(owner_thread: &WorkerThread) -> Self { + /// Creates the base of a new scope for the given registry + fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self { + let registry = registry.unwrap_or_else(|| match owner { + Some(owner) => owner.registry(), + None => global_registry(), + }); + ScopeBase { - owner_thread_index: owner_thread.index(), - registry: owner_thread.registry().clone(), + registry: Arc::clone(registry), panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: CountLatch::new(), + job_completed_latch: ScopeLatch::new(owner), marker: PhantomData, } } @@ -527,22 +621,19 @@ impl<'scope> ScopeBase<'scope> { /// Executes `func` as a job, either aborting or executing as /// appropriate. - /// - /// Unsafe because it must be executed on a worker thread. - unsafe fn complete<FUNC, R>(&self, owner_thread: &WorkerThread, func: FUNC) -> R + fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R where FUNC: FnOnce() -> R, { let result = self.execute_job_closure(func); - self.steal_till_jobs_complete(owner_thread); + self.job_completed_latch.wait(owner); + self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated } /// Executes `func` as a job, either aborting or executing as /// appropriate. - /// - /// Unsafe because it must be executed on a worker thread. - unsafe fn execute_job<FUNC>(&self, func: FUNC) + fn execute_job<FUNC>(&self, func: FUNC) where FUNC: FnOnce(), { @@ -552,25 +643,24 @@ impl<'scope> ScopeBase<'scope> { /// Executes `func` as a job in scope. Adjusts the "job completed" /// counters and also catches any panic and stores it into /// `scope`. - /// - /// Unsafe because this must be executed on a worker thread. - unsafe fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> + fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R, { match unwind::halt_unwinding(func) { Ok(r) => { - self.job_completed_ok(); + self.job_completed_latch.set(); Some(r) } Err(err) => { self.job_panicked(err); + self.job_completed_latch.set(); None } } } - unsafe fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { + fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { // capture the first error we see, free the rest let nil = ptr::null_mut(); let mut err = Box::new(err); // box up the fat ptr @@ -581,36 +671,73 @@ impl<'scope> ScopeBase<'scope> { { mem::forget(err); // ownership now transferred into self.panic } - - self.job_completed_latch - .set_and_tickle_one(&self.registry, self.owner_thread_index); } - unsafe fn job_completed_ok(&self) { - self.job_completed_latch - .set_and_tickle_one(&self.registry, self.owner_thread_index); - } - - unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) { - // wait for job counter to reach 0: - owner_thread.wait_until(&self.job_completed_latch); - + fn maybe_propagate_panic(&self) { // propagate panic, if any occurred; at this point, all // outstanding jobs have completed, so we can use a relaxed // ordering: let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); if !panic.is_null() { - let value: Box<Box<dyn Any + Send + 'static>> = mem::transmute(panic); + let value = unsafe { Box::from_raw(panic) }; unwind::resume_unwinding(*value); } } } +impl ScopeLatch { + fn new(owner: Option<&WorkerThread>) -> Self { + match owner { + Some(owner) => ScopeLatch::Stealing { + latch: CountLatch::new(), + registry: Arc::clone(owner.registry()), + worker_index: owner.index(), + }, + None => ScopeLatch::Blocking { + latch: CountLockLatch::new(), + }, + } + } + + fn increment(&self) { + match self { + ScopeLatch::Stealing { latch, .. } => latch.increment(), + ScopeLatch::Blocking { latch } => latch.increment(), + } + } + + fn set(&self) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => latch.set_and_tickle_one(registry, *worker_index), + ScopeLatch::Blocking { latch } => latch.set(), + } + } + + fn wait(&self, owner: Option<&WorkerThread>) { + match self { + ScopeLatch::Stealing { + latch, + registry, + worker_index, + } => unsafe { + let owner = owner.expect("owner thread"); + debug_assert_eq!(registry.id(), owner.registry().id()); + debug_assert_eq!(*worker_index, owner.index()); + owner.wait_until(latch); + }, + ScopeLatch::Blocking { latch } => latch.wait(), + } + } +} + impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") .field("pool_id", &self.base.registry.id()) - .field("owner_thread_index", &self.base.owner_thread_index) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() @@ -622,9 +749,23 @@ impl<'scope> fmt::Debug for ScopeFifo<'scope> { fmt.debug_struct("ScopeFifo") .field("num_fifos", &self.fifos.len()) .field("pool_id", &self.base.registry.id()) - .field("owner_thread_index", &self.base.owner_thread_index) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() } } + +impl fmt::Debug for ScopeLatch { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ScopeLatch::Stealing { latch, .. } => fmt + .debug_tuple("ScopeLatch::Stealing") + .field(latch) + .finish(), + ScopeLatch::Blocking { latch } => fmt + .debug_tuple("ScopeLatch::Blocking") + .field(latch) + .finish(), + } + } +} diff --git a/src/scope/test.rs b/src/scope/test.rs index 8cb82b6..471d78e 100644 --- a/src/scope/test.rs +++ b/src/scope/test.rs @@ -120,13 +120,13 @@ fn random_tree1(depth: usize, rng: &mut XorShiftRng) -> Tree<u32> { let children = if depth == 0 { vec![] } else { - (0..rng.gen_range(0, 4)) // somewhere between 0 and 3 children at each level + (0..rng.gen_range(0..4)) // somewhere between 0 and 3 children at each level .map(|_| random_tree1(depth - 1, rng)) .collect() }; Tree { - value: rng.gen_range(0, 1_000_000), + value: rng.gen_range(0..1_000_000), children, } } diff --git a/src/thread_pool/mod.rs b/src/thread_pool/mod.rs index 2209f63..5edaedc 100644 --- a/src/thread_pool/mod.rs +++ b/src/thread_pool/mod.rs @@ -5,6 +5,7 @@ use crate::join; use crate::registry::{Registry, ThreadSpawn, WorkerThread}; +use crate::scope::{do_in_place_scope, do_in_place_scope_fifo}; use crate::spawn; #[allow(deprecated)] use crate::Configuration; @@ -221,6 +222,30 @@ impl ThreadPool { self.install(|| scope_fifo(op)) } + /// Creates a scope that spawns work into this thread-pool. + /// + /// See also: [the `in_place_scope()` function][in_place_scope]. + /// + /// [in_place_scope]: fn.in_place_scope.html + pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&Scope<'scope>) -> R, + { + do_in_place_scope(Some(&self.registry), op) + } + + /// Creates a scope that spawns work into this thread-pool in FIFO order. + /// + /// See also: [the `in_place_scope_fifo()` function][in_place_scope_fifo]. + /// + /// [in_place_scope_fifo]: fn.in_place_scope_fifo.html + pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: FnOnce(&ScopeFifo<'scope>) -> R, + { + do_in_place_scope_fifo(Some(&self.registry), op) + } + /// Spawns an asynchronous task in this thread-pool. This task will /// run in the implicit, global scope, which means that it may outlast /// the current stack frame -- therefore, it cannot capture any references diff --git a/src/thread_pool/test.rs b/src/thread_pool/test.rs index 8d1c90c..1510e37 100644 --- a/src/thread_pool/test.rs +++ b/src/thread_pool/test.rs @@ -336,3 +336,33 @@ fn nested_fifo_scopes() { }); assert_eq!(counter.into_inner(), pools.len()); } + +#[test] +fn in_place_scope_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} + +#[test] +fn in_place_scope_fifo_no_deadlock() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let (tx, rx) = channel(); + let rx_ref = ℞ + pool.in_place_scope_fifo(move |s| { + // With regular scopes this closure would never run because this scope op + // itself would block the only worker thread. + s.spawn_fifo(move |_| { + tx.send(()).unwrap(); + }); + rx_ref.recv().unwrap(); + }); +} diff --git a/src/util.rs b/src/util.rs deleted file mode 100644 index 27d375a..0000000 --- a/src/util.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::mem; - -pub(super) fn leak<T>(v: T) -> &'static T { - unsafe { - let b = Box::new(v); - let p: *const T = &*b; - mem::forget(b); // leak our reference, so that `b` is never freed - &*p - } -} |