aboutsummaryrefslogtreecommitdiff
path: root/src/registry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/registry.rs')
-rw-r--r--src/registry.rs177
1 files changed, 72 insertions, 105 deletions
diff --git a/src/registry.rs b/src/registry.rs
index 5d56ac9..46cd22b 100644
--- a/src/registry.rs
+++ b/src/registry.rs
@@ -1,8 +1,7 @@
use crate::job::{JobFifo, JobRef, StackJob};
-use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
-use crate::log::Event::*;
-use crate::log::Logger;
+use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
use crate::sleep::Sleep;
+use crate::sync::Mutex;
use crate::unwind;
use crate::{
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
@@ -17,7 +16,7 @@ use std::io;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Arc, Mutex, Once};
+use std::sync::{Arc, Once};
use std::thread;
use std::usize;
@@ -130,7 +129,6 @@ where
}
pub(super) struct Registry {
- logger: Logger,
thread_infos: Vec<ThreadInfo>,
sleep: Sleep,
injected_jobs: Injector<JobRef>,
@@ -210,26 +208,7 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
- let builder = ThreadPoolBuilder::new()
- .num_threads(1)
- .spawn_handler(|thread| {
- // Rather than starting a new thread, we're just taking over the current thread
- // *without* running the main loop, so we can still return from here.
- // The WorkerThread is leaked, but we never shutdown the global pool anyway.
- let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
- let registry = &*worker_thread.registry;
- let index = worker_thread.index;
-
- unsafe {
- WorkerThread::set_current(worker_thread);
-
- // let registry know we are ready to do work
- Latch::set(&registry.thread_infos[index].primed);
- }
-
- Ok(())
- });
-
+ let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread();
let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
return fallback_result;
@@ -280,11 +259,9 @@ impl Registry {
})
.unzip();
- let logger = Logger::new(n_threads);
let registry = Arc::new(Registry {
- logger: logger.clone(),
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
- sleep: Sleep::new(logger, n_threads),
+ sleep: Sleep::new(n_threads),
injected_jobs: Injector::new(),
broadcasts: Mutex::new(broadcasts),
terminate_count: AtomicUsize::new(1),
@@ -305,6 +282,25 @@ impl Registry {
stealer,
index,
};
+
+ if index == 0 && builder.use_current_thread {
+ if !WorkerThread::current().is_null() {
+ return Err(ThreadPoolBuildError::new(
+ ErrorKind::CurrentThreadAlreadyInPool,
+ ));
+ }
+ // Rather than starting a new thread, we're just taking over the current thread
+ // *without* running the main loop, so we can still return from here.
+ // The WorkerThread is leaked, but we never shutdown the global pool anyway.
+ let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread)));
+
+ unsafe {
+ WorkerThread::set_current(worker_thread);
+ Latch::set(&registry.thread_infos[index].primed);
+ }
+ continue;
+ }
+
if let Err(e) = builder.get_spawn_handler().spawn(thread) {
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
}
@@ -363,11 +359,6 @@ impl Registry {
}
}
- #[inline]
- pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
- self.logger.log(event)
- }
-
pub(super) fn num_threads(&self) -> usize {
self.thread_infos.len()
}
@@ -426,8 +417,6 @@ impl Registry {
/// whatever worker has nothing to do. Use this if you know that
/// you are not on a worker of this registry.
pub(super) fn inject(&self, injected_job: JobRef) {
- self.log(|| JobsInjected { count: 1 });
-
// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
@@ -442,22 +431,17 @@ impl Registry {
let queue_was_empty = self.injected_jobs.is_empty();
self.injected_jobs.push(injected_job);
- self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty);
+ self.sleep.new_injected_jobs(1, queue_was_empty);
}
fn has_injected_job(&self) -> bool {
!self.injected_jobs.is_empty()
}
- fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
+ fn pop_injected_job(&self) -> Option<JobRef> {
loop {
match self.injected_jobs.steal() {
- Steal::Success(job) => {
- self.log(|| JobUninjected {
- worker: worker_index,
- });
- return Some(job);
- }
+ Steal::Success(job) => return Some(job),
Steal::Empty => return None,
Steal::Retry => {}
}
@@ -471,9 +455,6 @@ impl Registry {
/// **Panics** if not given exactly as many jobs as there are threads.
pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) {
assert_eq!(self.num_threads(), injected_jobs.len());
- self.log(|| JobBroadcast {
- count: self.num_threads(),
- });
{
let broadcasts = self.broadcasts.lock().unwrap();
@@ -545,9 +526,6 @@ impl Registry {
self.inject(job.as_job_ref());
job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
- // flush accumulated logs as we exit the thread
- self.logger.log(|| Flush);
-
job.into_result()
})
}
@@ -610,7 +588,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
- unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
+ unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
@@ -640,10 +618,7 @@ struct ThreadInfo {
/// This latch is *set* by the `terminate` method on the
/// `Registry`, once the registry's main "terminate" counter
/// reaches zero.
- ///
- /// NB. We use a `CountLatch` here because it has no lifetimes and is
- /// meant for async use, but the count never gets higher than one.
- terminate: CountLatch,
+ terminate: OnceLatch,
/// the "stealer" half of the worker's deque
stealer: Stealer<JobRef>,
@@ -654,7 +629,7 @@ impl ThreadInfo {
ThreadInfo {
primed: LockLatch::new(),
stopped: LockLatch::new(),
- terminate: CountLatch::new(),
+ terminate: OnceLatch::new(),
stealer,
}
}
@@ -737,11 +712,6 @@ impl WorkerThread {
&self.registry
}
- #[inline]
- pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
- self.registry.logger.log(event)
- }
-
/// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
#[inline]
pub(super) fn index(&self) -> usize {
@@ -750,12 +720,9 @@ impl WorkerThread {
#[inline]
pub(super) unsafe fn push(&self, job: JobRef) {
- self.log(|| JobPushed { worker: self.index });
let queue_was_empty = self.worker.is_empty();
self.worker.push(job);
- self.registry
- .sleep
- .new_internal_jobs(self.index, 1, queue_was_empty);
+ self.registry.sleep.new_internal_jobs(1, queue_was_empty);
}
#[inline]
@@ -777,7 +744,6 @@ impl WorkerThread {
let popped_job = self.worker.pop();
if popped_job.is_some() {
- self.log(|| JobPopped { worker: self.index });
return popped_job;
}
@@ -813,31 +779,51 @@ impl WorkerThread {
// accesses, which would be *very bad*
let abort_guard = unwind::AbortIfPanic;
- let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
- while !latch.probe() {
- if let Some(job) = self.find_work() {
- self.registry.sleep.work_found(idle_state);
+ 'outer: while !latch.probe() {
+ // Check for local work *before* we start marking ourself idle,
+ // especially to avoid modifying shared sleep state.
+ if let Some(job) = self.take_local_job() {
self.execute(job);
- idle_state = self.registry.sleep.start_looking(self.index, latch);
- } else {
- self.registry
- .sleep
- .no_work_found(&mut idle_state, latch, || self.has_injected_job())
+ continue;
+ }
+
+ let mut idle_state = self.registry.sleep.start_looking(self.index);
+ while !latch.probe() {
+ if let Some(job) = self.find_work() {
+ self.registry.sleep.work_found();
+ self.execute(job);
+ // The job might have injected local work, so go back to the outer loop.
+ continue 'outer;
+ } else {
+ self.registry
+ .sleep
+ .no_work_found(&mut idle_state, latch, || self.has_injected_job())
+ }
}
- }
- // If we were sleepy, we are not anymore. We "found work" --
- // whatever the surrounding thread was doing before it had to
- // wait.
- self.registry.sleep.work_found(idle_state);
+ // If we were sleepy, we are not anymore. We "found work" --
+ // whatever the surrounding thread was doing before it had to wait.
+ self.registry.sleep.work_found();
+ break;
+ }
- self.log(|| ThreadSawLatchSet {
- worker: self.index,
- latch_addr: latch.addr(),
- });
mem::forget(abort_guard); // successful execution, do not abort
}
+ unsafe fn wait_until_out_of_work(&self) {
+ debug_assert_eq!(self as *const _, WorkerThread::current());
+ let registry = &*self.registry;
+ let index = self.index;
+
+ self.wait_until(&registry.thread_infos[index].terminate);
+
+ // Should not be any work left in our queue.
+ debug_assert!(self.take_local_job().is_none());
+
+ // Let registry know we are done
+ Latch::set(&registry.thread_infos[index].stopped);
+ }
+
fn find_work(&self) -> Option<JobRef> {
// Try to find some work to do. We give preference first
// to things in our local deque, then in other workers
@@ -846,7 +832,7 @@ impl WorkerThread {
// we take on something new.
self.take_local_job()
.or_else(|| self.steal())
- .or_else(|| self.registry.pop_injected_job(self.index))
+ .or_else(|| self.registry.pop_injected_job())
}
pub(super) fn yield_now(&self) -> Yield {
@@ -898,13 +884,7 @@ impl WorkerThread {
.find_map(|victim_index| {
let victim = &thread_infos[victim_index];
match victim.stealer.steal() {
- Steal::Success(job) => {
- self.log(|| JobStolen {
- worker: self.index,
- victim: victim_index,
- });
- Some(job)
- }
+ Steal::Success(job) => Some(job),
Steal::Empty => None,
Steal::Retry => {
retry = true;
@@ -940,24 +920,11 @@ unsafe fn main_loop(thread: ThreadBuilder) {
registry.catch_unwind(|| handler(index));
}
- let my_terminate_latch = &registry.thread_infos[index].terminate;
- worker_thread.log(|| ThreadStart {
- worker: index,
- terminate_addr: my_terminate_latch.as_core_latch().addr(),
- });
- worker_thread.wait_until(my_terminate_latch);
-
- // Should not be any work left in our queue.
- debug_assert!(worker_thread.take_local_job().is_none());
-
- // let registry know we are done
- Latch::set(&registry.thread_infos[index].stopped);
+ worker_thread.wait_until_out_of_work();
// Normal termination, do not abort.
mem::forget(abort_guard);
- worker_thread.log(|| ThreadTerminate { worker: index });
-
// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
registry.catch_unwind(|| handler(index));