diff options
Diffstat (limited to 'src/registry.rs')
-rw-r--r-- | src/registry.rs | 177 |
1 files changed, 72 insertions, 105 deletions
diff --git a/src/registry.rs b/src/registry.rs index 5d56ac9..46cd22b 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -1,8 +1,7 @@ use crate::job::{JobFifo, JobRef, StackJob}; -use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch}; -use crate::log::Event::*; -use crate::log::Logger; +use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch}; use crate::sleep::Sleep; +use crate::sync::Mutex; use crate::unwind; use crate::{ ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, @@ -17,7 +16,7 @@ use std::io; use std::mem; use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Once}; use std::thread; use std::usize; @@ -130,7 +129,6 @@ where } pub(super) struct Registry { - logger: Logger, thread_infos: Vec<ThreadInfo>, sleep: Sleep, injected_jobs: Injector<JobRef>, @@ -210,26 +208,7 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new() - .num_threads(1) - .spawn_handler(|thread| { - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - unsafe { - WorkerThread::set_current(worker_thread); - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - } - - Ok(()) - }); - + let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; @@ -280,11 +259,9 @@ impl Registry { }) .unzip(); - let logger = Logger::new(n_threads); let registry = Arc::new(Registry { - logger: logger.clone(), thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(), - sleep: Sleep::new(logger, n_threads), + sleep: Sleep::new(n_threads), injected_jobs: Injector::new(), broadcasts: Mutex::new(broadcasts), terminate_count: AtomicUsize::new(1), @@ -305,6 +282,25 @@ impl Registry { stealer, index, }; + + if index == 0 && builder.use_current_thread { + if !WorkerThread::current().is_null() { + return Err(ThreadPoolBuildError::new( + ErrorKind::CurrentThreadAlreadyInPool, + )); + } + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); + + unsafe { + WorkerThread::set_current(worker_thread); + Latch::set(®istry.thread_infos[index].primed); + } + continue; + } + if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } @@ -363,11 +359,6 @@ impl Registry { } } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.logger.log(event) - } - pub(super) fn num_threads(&self) -> usize { self.thread_infos.len() } @@ -426,8 +417,6 @@ impl Registry { /// whatever worker has nothing to do. Use this if you know that /// you are not on a worker of this registry. pub(super) fn inject(&self, injected_job: JobRef) { - self.log(|| JobsInjected { count: 1 }); - // It should not be possible for `state.terminate` to be true // here. It is only set to true when the user creates (and // drops) a `ThreadPool`; and, in that case, they cannot be @@ -442,22 +431,17 @@ impl Registry { let queue_was_empty = self.injected_jobs.is_empty(); self.injected_jobs.push(injected_job); - self.sleep.new_injected_jobs(usize::MAX, 1, queue_was_empty); + self.sleep.new_injected_jobs(1, queue_was_empty); } fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } - fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> { + fn pop_injected_job(&self) -> Option<JobRef> { loop { match self.injected_jobs.steal() { - Steal::Success(job) => { - self.log(|| JobUninjected { - worker: worker_index, - }); - return Some(job); - } + Steal::Success(job) => return Some(job), Steal::Empty => return None, Steal::Retry => {} } @@ -471,9 +455,6 @@ impl Registry { /// **Panics** if not given exactly as many jobs as there are threads. pub(super) fn inject_broadcast(&self, injected_jobs: impl ExactSizeIterator<Item = JobRef>) { assert_eq!(self.num_threads(), injected_jobs.len()); - self.log(|| JobBroadcast { - count: self.num_threads(), - }); { let broadcasts = self.broadcasts.lock().unwrap(); @@ -545,9 +526,6 @@ impl Registry { self.inject(job.as_job_ref()); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. - // flush accumulated logs as we exit the thread - self.logger.log(|| Flush); - job.into_result() }) } @@ -610,7 +588,7 @@ impl Registry { pub(super) fn terminate(&self) { if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 { for (i, thread_info) in self.thread_infos.iter().enumerate() { - unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; + unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) }; } } } @@ -640,10 +618,7 @@ struct ThreadInfo { /// This latch is *set* by the `terminate` method on the /// `Registry`, once the registry's main "terminate" counter /// reaches zero. - /// - /// NB. We use a `CountLatch` here because it has no lifetimes and is - /// meant for async use, but the count never gets higher than one. - terminate: CountLatch, + terminate: OnceLatch, /// the "stealer" half of the worker's deque stealer: Stealer<JobRef>, @@ -654,7 +629,7 @@ impl ThreadInfo { ThreadInfo { primed: LockLatch::new(), stopped: LockLatch::new(), - terminate: CountLatch::new(), + terminate: OnceLatch::new(), stealer, } } @@ -737,11 +712,6 @@ impl WorkerThread { &self.registry } - #[inline] - pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) { - self.registry.logger.log(event) - } - /// Our index amongst the worker threads (ranges from `0..self.num_threads()`). #[inline] pub(super) fn index(&self) -> usize { @@ -750,12 +720,9 @@ impl WorkerThread { #[inline] pub(super) unsafe fn push(&self, job: JobRef) { - self.log(|| JobPushed { worker: self.index }); let queue_was_empty = self.worker.is_empty(); self.worker.push(job); - self.registry - .sleep - .new_internal_jobs(self.index, 1, queue_was_empty); + self.registry.sleep.new_internal_jobs(1, queue_was_empty); } #[inline] @@ -777,7 +744,6 @@ impl WorkerThread { let popped_job = self.worker.pop(); if popped_job.is_some() { - self.log(|| JobPopped { worker: self.index }); return popped_job; } @@ -813,31 +779,51 @@ impl WorkerThread { // accesses, which would be *very bad* let abort_guard = unwind::AbortIfPanic; - let mut idle_state = self.registry.sleep.start_looking(self.index, latch); - while !latch.probe() { - if let Some(job) = self.find_work() { - self.registry.sleep.work_found(idle_state); + 'outer: while !latch.probe() { + // Check for local work *before* we start marking ourself idle, + // especially to avoid modifying shared sleep state. + if let Some(job) = self.take_local_job() { self.execute(job); - idle_state = self.registry.sleep.start_looking(self.index, latch); - } else { - self.registry - .sleep - .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + continue; + } + + let mut idle_state = self.registry.sleep.start_looking(self.index); + while !latch.probe() { + if let Some(job) = self.find_work() { + self.registry.sleep.work_found(); + self.execute(job); + // The job might have injected local work, so go back to the outer loop. + continue 'outer; + } else { + self.registry + .sleep + .no_work_found(&mut idle_state, latch, || self.has_injected_job()) + } } - } - // If we were sleepy, we are not anymore. We "found work" -- - // whatever the surrounding thread was doing before it had to - // wait. - self.registry.sleep.work_found(idle_state); + // If we were sleepy, we are not anymore. We "found work" -- + // whatever the surrounding thread was doing before it had to wait. + self.registry.sleep.work_found(); + break; + } - self.log(|| ThreadSawLatchSet { - worker: self.index, - latch_addr: latch.addr(), - }); mem::forget(abort_guard); // successful execution, do not abort } + unsafe fn wait_until_out_of_work(&self) { + debug_assert_eq!(self as *const _, WorkerThread::current()); + let registry = &*self.registry; + let index = self.index; + + self.wait_until(®istry.thread_infos[index].terminate); + + // Should not be any work left in our queue. + debug_assert!(self.take_local_job().is_none()); + + // Let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); + } + fn find_work(&self) -> Option<JobRef> { // Try to find some work to do. We give preference first // to things in our local deque, then in other workers @@ -846,7 +832,7 @@ impl WorkerThread { // we take on something new. self.take_local_job() .or_else(|| self.steal()) - .or_else(|| self.registry.pop_injected_job(self.index)) + .or_else(|| self.registry.pop_injected_job()) } pub(super) fn yield_now(&self) -> Yield { @@ -898,13 +884,7 @@ impl WorkerThread { .find_map(|victim_index| { let victim = &thread_infos[victim_index]; match victim.stealer.steal() { - Steal::Success(job) => { - self.log(|| JobStolen { - worker: self.index, - victim: victim_index, - }); - Some(job) - } + Steal::Success(job) => Some(job), Steal::Empty => None, Steal::Retry => { retry = true; @@ -940,24 +920,11 @@ unsafe fn main_loop(thread: ThreadBuilder) { registry.catch_unwind(|| handler(index)); } - let my_terminate_latch = ®istry.thread_infos[index].terminate; - worker_thread.log(|| ThreadStart { - worker: index, - terminate_addr: my_terminate_latch.as_core_latch().addr(), - }); - worker_thread.wait_until(my_terminate_latch); - - // Should not be any work left in our queue. - debug_assert!(worker_thread.take_local_job().is_none()); - - // let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); + worker_thread.wait_until_out_of_work(); // Normal termination, do not abort. mem::forget(abort_guard); - worker_thread.log(|| ThreadTerminate { worker: index }); - // Inform a user callback that we exited a thread. if let Some(ref handler) = registry.exit_handler { registry.catch_unwind(|| handler(index)); |