aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs84
1 files changed, 59 insertions, 25 deletions
diff --git a/src/lib.rs b/src/lib.rs
index c9694ee..39df8a2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -73,10 +73,9 @@ use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::str::FromStr;
+use std::thread;
#[macro_use]
-mod log;
-#[macro_use]
mod private;
mod broadcast;
@@ -104,6 +103,12 @@ pub use self::thread_pool::current_thread_index;
pub use self::thread_pool::ThreadPool;
pub use self::thread_pool::{yield_local, yield_now, Yield};
+#[cfg(not(feature = "web_spin_lock"))]
+use std::sync;
+
+#[cfg(feature = "web_spin_lock")]
+use wasm_sync as sync;
+
use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};
/// Returns the maximum number of threads that Rayon supports in a single thread-pool.
@@ -148,6 +153,7 @@ pub struct ThreadPoolBuildError {
#[derive(Debug)]
enum ErrorKind {
GlobalPoolAlreadyInitialized,
+ CurrentThreadAlreadyInPool,
IOError(io::Error),
}
@@ -175,6 +181,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
num_threads: usize,
+ /// The thread we're building *from* will also be part of the pool.
+ use_current_thread: bool,
+
/// Custom closure, if any, to handle a panic that we cannot propagate
/// anywhere else.
panic_handler: Option<Box<PanicHandler>>,
@@ -228,6 +237,7 @@ impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
+ use_current_thread: false,
panic_handler: None,
get_thread_name: None,
stack_size: None,
@@ -284,12 +294,12 @@ where
impl ThreadPoolBuilder {
/// Creates a scoped `ThreadPool` initialized using this configuration.
///
- /// This is a convenience function for building a pool using [`crossbeam::scope`]
+ /// This is a convenience function for building a pool using [`std::thread::scope`]
/// to spawn threads in a [`spawn_handler`](#method.spawn_handler).
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
+ /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// # Examples
///
@@ -324,28 +334,22 @@ impl ThreadPoolBuilder {
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
F: FnOnce(&ThreadPool) -> R,
{
- let result = crossbeam_utils::thread::scope(|scope| {
- let wrapper = &wrapper;
+ std::thread::scope(|scope| {
let pool = self
.spawn_handler(|thread| {
- let mut builder = scope.builder();
+ let mut builder = std::thread::Builder::new();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
- builder.spawn(move |_| wrapper(thread))?;
+ builder.spawn_scoped(scope, || wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
- });
-
- match result {
- Ok(result) => result,
- Err(err) => unwind::resume_unwinding(err),
- }
+ })
}
}
@@ -354,13 +358,11 @@ impl<S> ThreadPoolBuilder<S> {
///
/// Note that the threads will not exit until after the pool is dropped. It
/// is up to the caller to wait for thread termination if that is important
- /// for any invariants. For instance, threads created in [`crossbeam::scope`]
+ /// for any invariants. For instance, threads created in [`std::thread::scope`]
/// will be joined before that scope returns, and this will block indefinitely
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
///
- /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
- ///
/// # Examples
///
/// A minimal spawn handler just needs to call `run()` from an independent thread.
@@ -409,6 +411,7 @@ impl<S> ThreadPoolBuilder<S> {
/// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in
/// [`build_scoped`](#method.build_scoped).
///
+ /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html
/// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html
///
/// ```
@@ -445,6 +448,7 @@ impl<S> ThreadPoolBuilder<S> {
spawn_handler: CustomSpawn::new(spawn),
// ..self
num_threads: self.num_threads,
+ use_current_thread: self.use_current_thread,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
@@ -465,12 +469,18 @@ impl<S> ThreadPoolBuilder<S> {
if self.num_threads > 0 {
self.num_threads
} else {
+ let default = || {
+ thread::available_parallelism()
+ .map(|n| n.get())
+ .unwrap_or(1)
+ };
+
match env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
- Some(x) if x > 0 => return x,
- Some(x) if x == 0 => return num_cpus::get(),
+ Some(x @ 1..) => return x,
+ Some(0) => return default(),
_ => {}
}
@@ -479,8 +489,8 @@ impl<S> ThreadPoolBuilder<S> {
.ok()
.and_then(|s| usize::from_str(&s).ok())
{
- Some(x) if x > 0 => x,
- _ => num_cpus::get(),
+ Some(x @ 1..) => x,
+ _ => default(),
}
}
}
@@ -519,9 +529,8 @@ impl<S> ThreadPoolBuilder<S> {
/// may change in the future, if you wish to rely on a fixed
/// number of threads, you should use this function to specify
/// that number. To reproduce the current default behavior, you
- /// may wish to use the [`num_cpus`
- /// crate](https://crates.io/crates/num_cpus) to query the number
- /// of CPUs dynamically.
+ /// may wish to use [`std::thread::available_parallelism`]
+ /// to query the number of CPUs dynamically.
///
/// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
@@ -532,6 +541,24 @@ impl<S> ThreadPoolBuilder<S> {
self
}
+ /// Use the current thread as one of the threads in the pool.
+ ///
+ /// The current thread is guaranteed to be at index 0, and since the thread is not managed by
+ /// rayon, the spawn and exit handlers do not run for that thread.
+ ///
+ /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into
+ /// the thread-pool will generally not be picked up automatically by this thread unless you
+ /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`].
+ ///
+ /// # Local thread-pools
+ ///
+ /// Using this in a local thread-pool means the registry will be leaked. In future versions
+ /// there might be a way of cleaning up the current-thread state.
+ pub fn use_current_thread(mut self) -> Self {
+ self.use_current_thread = true;
+ self
+ }
+
/// Returns a copy of the current panic handler.
fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> {
self.panic_handler.take()
@@ -734,18 +761,22 @@ impl ThreadPoolBuildError {
const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
"The global thread pool has already been initialized.";
+const CURRENT_THREAD_ALREADY_IN_POOL: &str =
+ "The current thread is already part of another thread pool.";
+
impl Error for ThreadPoolBuildError {
#[allow(deprecated)]
fn description(&self) -> &str {
match self.kind {
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED,
+ ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL,
ErrorKind::IOError(ref e) => e.description(),
}
}
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
- ErrorKind::GlobalPoolAlreadyInitialized => None,
+ ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None,
ErrorKind::IOError(e) => Some(e),
}
}
@@ -754,6 +785,7 @@ impl Error for ThreadPoolBuildError {
impl fmt::Display for ThreadPoolBuildError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.kind {
+ ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f),
ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f),
ErrorKind::IOError(e) => e.fmt(f),
}
@@ -771,6 +803,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
+ ref use_current_thread,
ref get_thread_name,
ref panic_handler,
ref stack_size,
@@ -795,6 +828,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
+ .field("use_current_thread", use_current_thread)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)