diff options
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 84 |
1 files changed, 59 insertions, 25 deletions
@@ -73,10 +73,9 @@ use std::fmt; use std::io; use std::marker::PhantomData; use std::str::FromStr; +use std::thread; #[macro_use] -mod log; -#[macro_use] mod private; mod broadcast; @@ -104,6 +103,12 @@ pub use self::thread_pool::current_thread_index; pub use self::thread_pool::ThreadPool; pub use self::thread_pool::{yield_local, yield_now, Yield}; +#[cfg(not(feature = "web_spin_lock"))] +use std::sync; + +#[cfg(feature = "web_spin_lock")] +use wasm_sync as sync; + use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; /// Returns the maximum number of threads that Rayon supports in a single thread-pool. @@ -148,6 +153,7 @@ pub struct ThreadPoolBuildError { #[derive(Debug)] enum ErrorKind { GlobalPoolAlreadyInitialized, + CurrentThreadAlreadyInPool, IOError(io::Error), } @@ -175,6 +181,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> { /// If RAYON_NUM_THREADS is invalid or zero will use the default. num_threads: usize, + /// The thread we're building *from* will also be part of the pool. + use_current_thread: bool, + /// Custom closure, if any, to handle a panic that we cannot propagate /// anywhere else. panic_handler: Option<Box<PanicHandler>>, @@ -228,6 +237,7 @@ impl Default for ThreadPoolBuilder { fn default() -> Self { ThreadPoolBuilder { num_threads: 0, + use_current_thread: false, panic_handler: None, get_thread_name: None, stack_size: None, @@ -284,12 +294,12 @@ where impl ThreadPoolBuilder { /// Creates a scoped `ThreadPool` initialized using this configuration. /// - /// This is a convenience function for building a pool using [`crossbeam::scope`] + /// This is a convenience function for building a pool using [`std::thread::scope`] /// to spawn threads in a [`spawn_handler`](#method.spawn_handler). /// The threads in this pool will start by calling `wrapper`, which should /// do initialization and continue by calling `ThreadBuilder::run()`. /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html + /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// # Examples /// @@ -324,28 +334,22 @@ impl ThreadPoolBuilder { W: Fn(ThreadBuilder) + Sync, // expected to call `run()` F: FnOnce(&ThreadPool) -> R, { - let result = crossbeam_utils::thread::scope(|scope| { - let wrapper = &wrapper; + std::thread::scope(|scope| { let pool = self .spawn_handler(|thread| { - let mut builder = scope.builder(); + let mut builder = std::thread::Builder::new(); if let Some(name) = thread.name() { builder = builder.name(name.to_string()); } if let Some(size) = thread.stack_size() { builder = builder.stack_size(size); } - builder.spawn(move |_| wrapper(thread))?; + builder.spawn_scoped(scope, || wrapper(thread))?; Ok(()) }) .build()?; Ok(with_pool(&pool)) - }); - - match result { - Ok(result) => result, - Err(err) => unwind::resume_unwinding(err), - } + }) } } @@ -354,13 +358,11 @@ impl<S> ThreadPoolBuilder<S> { /// /// Note that the threads will not exit until after the pool is dropped. It /// is up to the caller to wait for thread termination if that is important - /// for any invariants. For instance, threads created in [`crossbeam::scope`] + /// for any invariants. For instance, threads created in [`std::thread::scope`] /// will be joined before that scope returns, and this will block indefinitely /// if the pool is leaked. Furthermore, the global thread pool doesn't terminate /// until the entire process exits! /// - /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html - /// /// # Examples /// /// A minimal spawn handler just needs to call `run()` from an independent thread. @@ -409,6 +411,7 @@ impl<S> ThreadPoolBuilder<S> { /// or [`std::thread::scope`] introduced in Rust 1.63, which is encapsulated in /// [`build_scoped`](#method.build_scoped). /// + /// [`crossbeam::scope`]: https://docs.rs/crossbeam/0.8/crossbeam/fn.scope.html /// [`std::thread::scope`]: https://doc.rust-lang.org/std/thread/fn.scope.html /// /// ``` @@ -445,6 +448,7 @@ impl<S> ThreadPoolBuilder<S> { spawn_handler: CustomSpawn::new(spawn), // ..self num_threads: self.num_threads, + use_current_thread: self.use_current_thread, panic_handler: self.panic_handler, get_thread_name: self.get_thread_name, stack_size: self.stack_size, @@ -465,12 +469,18 @@ impl<S> ThreadPoolBuilder<S> { if self.num_threads > 0 { self.num_threads } else { + let default = || { + thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }; + match env::var("RAYON_NUM_THREADS") .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => return x, - Some(x) if x == 0 => return num_cpus::get(), + Some(x @ 1..) => return x, + Some(0) => return default(), _ => {} } @@ -479,8 +489,8 @@ impl<S> ThreadPoolBuilder<S> { .ok() .and_then(|s| usize::from_str(&s).ok()) { - Some(x) if x > 0 => x, - _ => num_cpus::get(), + Some(x @ 1..) => x, + _ => default(), } } } @@ -519,9 +529,8 @@ impl<S> ThreadPoolBuilder<S> { /// may change in the future, if you wish to rely on a fixed /// number of threads, you should use this function to specify /// that number. To reproduce the current default behavior, you - /// may wish to use the [`num_cpus` - /// crate](https://crates.io/crates/num_cpus) to query the number - /// of CPUs dynamically. + /// may wish to use [`std::thread::available_parallelism`] + /// to query the number of CPUs dynamically. /// /// **Old environment variable:** `RAYON_NUM_THREADS` is a one-to-one /// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment @@ -532,6 +541,24 @@ impl<S> ThreadPoolBuilder<S> { self } + /// Use the current thread as one of the threads in the pool. + /// + /// The current thread is guaranteed to be at index 0, and since the thread is not managed by + /// rayon, the spawn and exit handlers do not run for that thread. + /// + /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into + /// the thread-pool will generally not be picked up automatically by this thread unless you + /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. + /// + /// # Local thread-pools + /// + /// Using this in a local thread-pool means the registry will be leaked. In future versions + /// there might be a way of cleaning up the current-thread state. + pub fn use_current_thread(mut self) -> Self { + self.use_current_thread = true; + self + } + /// Returns a copy of the current panic handler. fn take_panic_handler(&mut self) -> Option<Box<PanicHandler>> { self.panic_handler.take() @@ -734,18 +761,22 @@ impl ThreadPoolBuildError { const GLOBAL_POOL_ALREADY_INITIALIZED: &str = "The global thread pool has already been initialized."; +const CURRENT_THREAD_ALREADY_IN_POOL: &str = + "The current thread is already part of another thread pool."; + impl Error for ThreadPoolBuildError { #[allow(deprecated)] fn description(&self) -> &str { match self.kind { ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL, ErrorKind::IOError(ref e) => e.description(), } } fn source(&self) -> Option<&(dyn Error + 'static)> { match &self.kind { - ErrorKind::GlobalPoolAlreadyInitialized => None, + ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None, ErrorKind::IOError(e) => Some(e), } } @@ -754,6 +785,7 @@ impl Error for ThreadPoolBuildError { impl fmt::Display for ThreadPoolBuildError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.kind { + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f), ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), ErrorKind::IOError(e) => e.fmt(f), } @@ -771,6 +803,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, + ref use_current_thread, ref get_thread_name, ref panic_handler, ref stack_size, @@ -795,6 +828,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> { f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) + .field("use_current_thread", use_current_thread) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) |