diff options
author | Maxim Kartashev <maxim.kartashev@jetbrains.com> | 2024-01-10 12:19:12 +0400 |
---|---|---|
committer | Maxim Kartashev <maxim.kartashev@jetbrains.com> | 2024-01-24 18:32:40 +0400 |
commit | 3a6456e1f447c2f02361469a8f140ef5ec0e63f5 (patch) | |
tree | 8f68f2b434bbbe1aa6ef313de98f3dbb991dc637 | |
parent | 283beedc05820aa3a6fef31303f6318920d452aa (diff) | |
download | JetBrainsRuntime-3a6456e1f447c2f02361469a8f140ef5ec0e63f5.tar.gz |
8288899: java/util/concurrent/ExecutorService/CloseTest.java failed with "InterruptedException: sleep interrupted"
Reviewed-by: alanb
19 files changed, 3883 insertions, 2067 deletions
diff --git a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java index 3278d191074..7884a131ffb 100644 --- a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java +++ b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java @@ -524,6 +524,10 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> { return pending; } + final void initPending(int count) { + U.putInt(this, PENDING, count); + } + /** * Sets the pending count to the given value. * @@ -724,26 +728,27 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> { * processed. */ public final void helpComplete(int maxTasks) { - ForkJoinPool.WorkQueue q; Thread t; boolean owned; - if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + ForkJoinPool.WorkQueue q; Thread t; boolean internal; + if (internal = + (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else q = ForkJoinPool.commonQueue(); if (q != null && maxTasks > 0) - q.helpComplete(this, owned, maxTasks); + q.helpComplete(this, internal, maxTasks); } + // ForkJoinTask overrides /** * Supports ForkJoinTask exception propagation. */ @Override - final int trySetException(Throwable ex) { + final void onAuxExceptionSet(Throwable ex) { CountedCompleter<?> a = this, p = a; - do {} while (isExceptionalStatus(a.trySetThrown(ex)) && - a.onExceptionalCompletion(ex, p) && - (a = (p = a).completer) != null && a.status >= 0); - return status; + do {} while (a.onExceptionalCompletion(ex, p) && + (a = (p = a).completer) != null && + a.trySetThrown(ex)); } /** diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 5e698b1540f..121f5cefbd6 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -47,11 +47,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.function.Predicate; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; import jdk.internal.access.JavaUtilConcurrentFJPAccess; import jdk.internal.access.SharedSecrets; import jdk.internal.misc.Unsafe; @@ -187,27 +186,39 @@ public class ForkJoinPool extends AbstractExecutorService { * Implementation Overview * * This class and its nested classes provide the main - * functionality and control for a set of worker threads: - * Submissions from non-FJ threads enter into submission queues. - * Workers take these tasks and typically split them into subtasks - * that may be stolen by other workers. Work-stealing based on - * randomized scans generally leads to better throughput than - * "work dealing" in which producers assign tasks to idle threads, - * in part because threads that have finished other tasks before - * the signalled thread wakes up (which can be a long time) can - * take the task instead. Preference rules give first priority to - * processing tasks from their own queues (LIFO or FIFO, depending - * on mode), then to randomized FIFO steals of tasks in other - * queues. This framework began as vehicle for supporting - * tree-structured parallelism using work-stealing. Over time, - * its scalability advantages led to extensions and changes to - * better support more diverse usage contexts. Because most - * internal methods and nested classes are interrelated, their - * main rationale and descriptions are presented here; individual - * methods and nested classes contain only brief comments about - * details. There are a fair number of odd code constructions and - * design decisions for components that reside at the edge of Java - * vs JVM functionality. + * functionality and control for a set of worker threads. Because + * most internal methods and nested classes are interrelated, + * their main rationale and descriptions are presented here; + * individual methods and nested classes contain only brief + * comments about details. Broadly: submissions from non-FJ + * threads enter into submission queues. Workers take these tasks + * and typically split them into subtasks that may be stolen by + * other workers. Work-stealing based on randomized scans + * generally leads to better throughput than "work dealing" in + * which producers assign tasks to idle threads, in part because + * threads that have finished other tasks before the signalled + * thread wakes up (which can be a long time) can take the task + * instead. Preference rules give first priority to processing + * tasks from their own queues (LIFO or FIFO, depending on mode), + * then to randomized FIFO steals of tasks in other queues. This + * framework began as vehicle for supporting tree-structured + * parallelism using work-stealing. Over time, its scalability + * advantages led to extensions and changes to better support more + * diverse usage contexts. Here's a brief history of major + * revisions, each also with other minor features and changes. + * + * 1. Only handle recursively structured computational tasks + * 2. Async (FIFO) mode and striped submission queues + * 3. Completion-based tasks (mainly CountedCompleters) + * 4. CommonPool and parallelStream support + * 5. InterruptibleTasks for externally submitted tasks + * + * Most changes involve adaptions of base algorithms using + * combinations of static and dynamic bitwise mode settings (both + * here and in ForkJoinTask), and subclassing of ForkJoinTask. + * There are a fair number of odd code constructions and design + * decisions for components that reside at the edge of Java vs JVM + * functionality. * * WorkQueues * ========== @@ -238,16 +249,11 @@ public class ForkJoinPool extends AbstractExecutorService { * Nardelli, PPoPP 2013 * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an * analysis of memory ordering requirements in work-stealing - * algorithms similar to the one used here. We also use ordered, - * moded accesses and/or fences for other control, with modes - * reflecting the presence or absence of other contextual sync - * provided by atomic and/or volatile accesses. Some methods (or - * their primary loops) begin with an acquire fence or - * otherwise-unnecessary volatile read that amounts to an - * acquiring read of "this" to cover all fields (which is - * sometimes stronger than necessary, but less brittle). Some - * constructions are intentionally racy because they use read - * values as hints, not for correctness. + * algorithms similar to the one used here. We use per-operation + * ordered writes of various kinds for updates, but usually use + * explicit load fences for reads, to cover access of several + * fields of possibly several objects without further constraining + * read-by-read ordering. * * We also support a user mode in which local task processing is * in FIFO, not LIFO order, simply by using a local version of @@ -256,7 +262,7 @@ public class ForkJoinPool extends AbstractExecutorService { * increased contention among task producers and consumers. Also, * the same data structure (and class) is used for "submission * queues" (described below) holding externally submitted tasks, - * that differ only in that a lock (field "access"; see below) is + * that differ only in that a lock (using field "phase"; see below) is * required by external callers to push and pop tasks. * * Adding tasks then takes the form of a classic array push(task) @@ -267,8 +273,7 @@ public class ForkJoinPool extends AbstractExecutorService { * uses masking, not mod, for indexing a power-of-two-sized array, * enforces memory ordering, supports resizing, and possibly * signals waiting workers to start scanning (described below), - * which requires even internal usages to strictly order accesses - * (using a form of lock release). + * which requires stronger forms of order accesses. * * The pop operation (always performed by owner) is of the form: * if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null) @@ -289,7 +294,10 @@ public class ForkJoinPool extends AbstractExecutorService { * * Slot k must be read with an acquiring read, which it must * anyway to dereference and run the task if the (acquiring) * CAS succeeds, but uses an explicit acquire fence to support - * the following rechecks even if the CAS is not attempted. + * the following rechecks even if the CAS is not attempted. To + * more easily distinguish among kinds of CAS failures, we use + * the compareAndExchange version, and usually handle null + * returns (indicating contention) separately from others. * * * q.base may change between reading and using its value to * index the slot. To avoid trying to use the wrong t, the @@ -310,8 +318,8 @@ public class ForkJoinPool extends AbstractExecutorService { * q.base doesn't change on repeated reads of null t and when * no other alternatives apply, spin-wait for it to settle. To * reduce producing these kinds of stalls by other stealers, we - * encourage timely writes to indices using store fences when - * memory ordering is not already constrained by context. + * encourage timely writes to indices using otherwise + * unnecessarily strong writes. * * * The CAS may fail, in which case we may want to retry unless * there is too much contention. One goal is to balance and @@ -329,12 +337,12 @@ public class ForkJoinPool extends AbstractExecutorService { * when deciding whether to try to poll or repoll after a * failure. Both top and base may move independently, and both * lag updates to the underlying array. To reduce memory - * contention, when possible, non-owners avoid reading the - * "top" index at all, and instead use array reads, including - * one-ahead reads to check whether to repoll, relying on the - * fact that a non-empty queue does not have two null slots in - * a row, except in cases (resizes and shifts) that can be - * detected with a secondary recheck. + * contention, non-owners avoid reading the "top" when + * possible, by using one-ahead reads to check whether to + * repoll, relying on the fact that a non-empty queue does not + * have two null slots in a row, except in cases (resizes and + * shifts) that can be detected with a secondary recheck that + * is less likely to conflict with owner writes. * * The poll operations in q.poll(), scan(), helpJoin(), and * elsewhere differ with respect to whether other queues are @@ -362,14 +370,11 @@ public class ForkJoinPool extends AbstractExecutorService { * like workers except that they are restricted to executing local * tasks that they submitted (or when known, subtasks thereof). * Insertion of tasks in shared mode requires a lock. We use only - * a simple spinlock because submitters encountering a busy queue - * move to a different position to use or create other queues. - * They (spin) block when registering new queues, and less - * often in tryRemove and helpComplete. The lock needed for - * external queues is generalized (as field "access") for - * operations on owned queues that require a fully-fenced write - * (including push, parking status, and termination) in order to - * deal with Dekker-like signalling constructs described below. + * a simple spinlock (as one role of field "phase") because + * submitters encountering a busy queue move to a different + * position to use or create other queues. They (spin) block when + * registering new queues, or indirectly elsewhere, by revisiting + * later. * * Management * ========== @@ -395,29 +400,58 @@ public class ForkJoinPool extends AbstractExecutorService { * restrict maximum parallelism to (1<<15)-1 (which is far in * excess of normal operating range) to allow ids, counts, and * their negations (used for thresholding) to fit into 16bit - * subfields. Field "parallelism" holds the target parallelism - * (normally corresponding to pool size). It is needed (nearly) - * only in methods updating ctl, so is packed nearby. As of the - * current release, users can dynamically reset target - * parallelism, which is read once per update, so only slowly has - * an effect in creating threads or letting them time out and - * terminate when idle. - * - * Field "runState" holds lifetime status, atomically and - * monotonically setting SHUTDOWN, STOP, and finally TERMINATED - * bits. It is updated only via bitwise atomics (getAndBitwiseOr). + * subfields. + * + * Field "runState" and per-WorkQueue field "phase" play similar + * roles, as lockable, versioned counters. Field runState also + * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED). + * The version tags enable detection of state changes (by + * comparing two reads) modulo bit wraparound. The bit range in + * each case suffices for purposes of determining quiescence, + * termination, avoiding ABA-like errors, and signal control, most + * of which are ultimately based on at most 15bit ranges (due to + * 32767 max total workers). RunState updates do not need to be + * atomic with respect to ctl updates, but because they are not, + * some care is required to avoid stalls. The seqLock properties + * detect changes and conditionally upgrade to coordinate with + * updates. It is typically held for less than a dozen + * instructions unless the queue array is being resized, during + * which contention is rare. To be conservative, lockRunState is + * implemented as a spin/sleep loop. Here and elsewhere spin + * constants are short enough to apply even on systems with few + * available processors. In addition to checking pool status, + * reads of runState sometimes serve as acquire fences before + * reading other fields. + * + * Field "parallelism" holds the target parallelism (normally + * corresponding to pool size). Users can dynamically reset target + * parallelism, but is only accessed when signalling or awaiting + * work, so only slowly has an effect in creating threads or + * letting them time out and terminate when idle. * * Array "queues" holds references to WorkQueues. It is updated * (only during worker creation and termination) under the - * registrationLock, but is otherwise concurrently readable (often - * prefaced by a volatile read of mode to check termination, that - * is required anyway, and serves as an acquire fence). To - * simplify index-based operations, the array size is always a - * power of two, and all readers must tolerate null slots. Worker - * queues are at odd indices. Worker ids masked with SMASK match - * their index. Shared (submission) queues are at even - * indices. Grouping them together in this way simplifies and - * speeds up task scanning. + * runState lock. It is otherwise concurrently readable but reads + * for use in scans (see below) are always prefaced by a volatile + * read of runState (or equivalent constructions), ensuring that + * its state is current at the point it is used (which is all we + * require). To simplify index-based operations, the array size is + * always a power of two, and all readers must tolerate null + * slots. Worker queues are at odd indices. Worker phase ids + * masked with SMASK match their index. Shared (submission) queues + * are at even indices. Grouping them together in this way aids in + * task scanning: At top-level, both kinds of queues should be + * sampled with approximately the same probability, which is + * simpler if they are all in the same array. But we also need to + * identify what kind they are without looking at them, leading to + * this odd/even scheme. One disadvantage is that there are + * usually many fewer submission queues, so there can be many + * wasted probes (null slots). But this is still cheaper than + * alternatives. Other loops over the queues array vary in origin + * and stride depending on whether they cover only submission + * (even) or worker (odd) queues or both, and whether they require + * randomness (in which case cyclically exhaustive strides may be + * used). * * All worker thread creation is on-demand, triggered by task * submissions, replacement of terminated workers, and/or @@ -429,7 +463,11 @@ public class ForkJoinPool extends AbstractExecutorService { * one source of some of the messy code constructions here). In * essence, the queues array serves as a weak reference * mechanism. In particular, the stack top subfield of ctl stores - * indices, not references. + * indices, not references. Operations on queues obtained from + * these indices remain valid (with at most some unnecessary extra + * work) even if an underlying worker failed and was replaced by + * another at the same index. During termination, worker queue + * array updates are disabled. * * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we * cannot let workers spin indefinitely scanning for tasks when @@ -438,28 +476,30 @@ public class ForkJoinPool extends AbstractExecutorService { * other hand, we must quickly prod them into action when new * tasks are submitted or generated. These latencies are mainly a * function of JVM park/unpark (and underlying OS) performance, - * which can be slow and variable. In many usages, ramp-up time + * which can be slow and variable (even though usages are + * streamlined as much as possible). In many usages, ramp-up time * is the main limiting factor in overall performance, which is * compounded at program start-up by JIT compilation and * allocation. On the other hand, throughput degrades when too - * many threads poll for too few tasks. + * many threads poll for too few tasks. (See below.) * * The "ctl" field atomically maintains total and "released" * worker counts, plus the head of the available worker queue * (actually stack, represented by the lower 32bit subfield of * ctl). Released workers are those known to be scanning for - * and/or running tasks. Unreleased ("available") workers are - * recorded in the ctl stack. These workers are made eligible for - * signalling by enqueuing in ctl (see method awaitWork). The - * "queue" is a form of Treiber stack. This is ideal for - * activating threads in most-recently used order, and improves - * performance and locality, outweighing the disadvantages of - * being prone to contention and inability to release a worker - * unless it is topmost on stack. The top stack state holds the - * value of the "phase" field of the worker: its index and status, - * plus a version counter that, in addition to the count subfields - * (also serving as version stamps) provide protection against - * Treiber stack ABA effects. + * and/or running tasks (we cannot accurately determine + * which). Unreleased ("available") workers are recorded in the + * ctl stack. These workers are made eligible for signalling by + * enqueuing in ctl (see method runWorker). This "queue" is a + * form of Treiber stack. This is ideal for activating threads in + * most-recently used order, and improves performance and + * locality, outweighing the disadvantages of being prone to + * contention and inability to release a worker unless it is + * topmost on stack. The top stack state holds the value of the + * "phase" field of the worker: its index and status, plus a + * version counter that, in addition to the count subfields (also + * serving as version stamps) provide protection against Treiber + * stack ABA effects. * * Creating workers. To create a worker, we pre-increment counts * (serving as a reservation), and attempt to construct a @@ -473,16 +513,14 @@ public class ForkJoinPool extends AbstractExecutorService { * workers. If exceptional, the exception is propagated, generally * to some external caller. * - * WorkQueue field "phase" is used by both workers and the pool to - * manage and track whether a worker is unsignalled (possibly - * blocked waiting for a signal), conveniently using the sign bit - * to check. When a worker is enqueued its phase field is set - * negative. Note that phase field updates lag queue CAS releases; - * seeing a negative phase does not guarantee that the worker is - * available (and so is never checked in this way). When queued, - * the lower 16 bits of its phase must hold its pool index. So we - * place the index there upon initialization and never modify - * these bits. + * WorkQueue field "phase" encodes the queue array id in lower + * bits, and otherwise acts similarly to the pool runState field: + * The "IDLE" bit is clear while active (either a released worker + * or a locked external queue), with other bits serving as a + * version counter to distinguish changes across multiple reads. + * Note that phase field updates lag queue CAS releases; seeing a + * non-idle phase does not guarantee that the worker is available + * (and so is never checked in this way). * * The ctl field also serves as the basis for memory * synchronization surrounding activation. This uses a more @@ -498,178 +536,230 @@ public class ForkJoinPool extends AbstractExecutorService { * workers to scan for tasks. Method signalWork and its callers * try to approximate the unattainable goal of having the right * number of workers activated for the tasks at hand, but must err - * on the side of too many workers vs too few to avoid stalls. If - * computations are purely tree structured, it suffices for every - * worker to activate another when it pushes a task into an empty - * queue, resulting in O(log(#threads)) steps to full activation. - * (To reduce resource usages in some cases, at the expense of - * slower startup in others, activation of an idle thread is - * preferred over creating a new one, here and elsewhere.) If - * instead, tasks come in serially from only a single producer, - * each worker taking its first (since the last activation) task - * from a queue should signal another if there are more tasks in - * that queue. This is equivalent to, but generally faster than, - * arranging the stealer take two tasks, re-pushing one on its own - * queue, and signalling (because its queue is empty), also - * resulting in logarithmic full activation time. Because we don't - * know about usage patterns (or most commonly, mixtures), we use - * both approaches. Together these are minimally necessary for - * maintaining liveness. However, they do not account for the fact - * that when tasks are short-lived, signals are unnecessary - * because workers will already be scanning for new tasks without - * the need of new signals. We track these cases (variable - * "prevSrc" in scan() and related methods) to avoid some - * unnecessary signals and scans. However, signal contention and - * overhead effects may still occur during ramp-up, ramp-down, and - * small computations involving only a few workers. + * on the side of too many workers vs too few to avoid stalls: + * + * * If computations are purely tree structured, it suffices for + * every worker to activate another when it pushes a task into + * an empty queue, resulting in O(log(#threads)) steps to full + * activation. Emptiness must be conservatively approximated, + * sometimes resulting in unnecessary signals. Also, to reduce + * resource usages in some cases, at the expense of slower + * startup in others, activation of an idle thread is preferred + * over creating a new one, here and elsewhere. + * + * * If instead, tasks come in serially from only a single + * producer, each worker taking its first (since the last + * activation) task from a queue should propagate a signal if + * there are more tasks in that queue. This is equivalent to, + * but generally faster than, arranging the stealer take + * multiple tasks, re-pushing one or more on its own queue, and + * signalling (because its queue is empty), also resulting in + * logarithmic full activation time + * + * * Because we don't know about usage patterns (or most commonly, + * mixtures), we use both approaches, which present even more + * opportunities to over-signal. Note that in either of these + * contexts, signals may be (and often are) unnecessary because + * active workers continue scanning after running tasks without + * the need to be signalled (which is one reason work stealing + * is often faster than alternatives), so additional workers + * aren't needed. But there is no efficient way to detect this. + * + * * For rapidly branching tasks that require full pool resources, + * oversignalling is OK, because signalWork will soon have no + * more workers to create or reactivate. But for others (mainly + * externally submitted tasks), overprovisioning may cause very + * noticeable slowdowns due to contention and resource + * wastage. All remedies are intrinsically heuristic. We use a + * strategy that works well in most cases: We track "sources" + * (queue ids) of non-empty (usually polled) queues while + * scanning. These are maintained in the "source" field of + * WorkQueues for use in method helpJoin and elsewhere (see + * below). We also maintain them as arguments/results of + * top-level polls (argument "window" in method scan, with setup + * in method runWorker) as an encoded sliding window of current + * and previous two sources (or INVALID_ID if none), and stop + * signalling when all were from the same source. Also, retries + * are suppressed on CAS failures by newly activated workers, + * which serves as a form of admission control. These + * mechanisms may result in transiently too few workers, but + * once workers poll from a new source, they rapidly reactivate + * others. + * + * * Despite these, signal contention and overhead effects still + * occur during ramp-up and ramp-down of small computations. * * Scanning. Method scan performs top-level scanning for (and * execution of) tasks by polling a pseudo-random permutation of - * the array (by starting at a random index, and using a constant - * cyclically exhaustive stride.) It uses the same basic polling + * the array (by starting at a given index, and using a constant + * cyclically exhaustive stride.) It uses the same basic polling * method as WorkQueue.poll(), but restarts with a different - * permutation on each invocation. (Non-top-level scans; for - * example in helpJoin, use simpler and faster linear probes - * because they do not systematically contend with top-level - * scans.) The pseudorandom generator need not have high-quality - * statistical properties in the long term. We use Marsaglia - * XorShifts, seeded with the Weyl sequence from ThreadLocalRandom - * probes, which are cheap and suffice. Scans do not otherwise - * explicitly take into account core affinities, loads, cache - * localities, etc, However, they do exploit temporal locality - * (which usually approximates these) by preferring to re-poll - * from the same queue (using method tryPoll()) after a successful - * poll before trying others (see method topLevelExec), which also - * reduces bookkeeping and scanning overhead. This also reduces + * permutation on each invocation. The pseudorandom generator + * need not have high-quality statistical properties in the long + * term. We use Marsaglia XorShifts, seeded with the Weyl sequence + * from ThreadLocalRandom probes, which are cheap and + * suffice. Scans do not otherwise explicitly take into account + * core affinities, loads, cache localities, etc, However, they do + * exploit temporal locality (which usually approximates these) by + * preferring to re-poll from the same queue (either in method + * tryPoll() or scan) after a successful poll before trying others + * (see method topLevelExec), which also reduces bookkeeping, + * cache traffic, and scanning overhead. But it also reduces * fairness, which is partially counteracted by giving up on * contention. * * Deactivation. When method scan indicates that no tasks are - * found by a worker, it deactivates (see awaitWork). Note that - * not finding tasks doesn't mean that there won't soon be + * found by a worker, it tries to deactivate (in runWorker). Note + * that not finding tasks doesn't mean that there won't soon be * some. Further, a scan may give up under contention, returning * even without knowing whether any tasks are still present, which - * is OK, given the above signalling rules that will eventually - * maintain progress. Blocking and unblocking via park/unpark can - * cause serious slowdowns when tasks are rapidly but irregularly - * generated (which is often due to garbage collectors and other - * activities). One way to ameliorate is for workers to rescan - * multiple times, even when there are unlikely to be tasks. But - * this causes enough memory and CAS contention to prefer using - * quieter spinwaits in awaitWork; currently set to small values - * that only cover near-miss scenarios for deactivate vs activate - * races. Because idle workers are often not yet blocked (via - * LockSupport.park), we use the WorkQueue access field to - * advertise that a waiter actually needs unparking upon signal. - * - * When idle workers are not continually woken up, the count - * fields in ctl allow efficient and accurate discovery of - * quiescent states (i.e., when all workers are idle) after - * deactivation. However, this voting mechanism alone does not - * guarantee that a pool can become dormant (quiesced or - * terminated), because external racing producers do not vote, and - * can asynchronously submit new tasks. To deal with this, the - * final unparked thread (in awaitWork) scans external queues to - * check for tasks that could have been added during a race window - * that would not be accompanied by a signal, in which case - * re-activating itself (or any other worker) to recheck. The same - * sets of checks are used in tryTerminate, to correctly trigger - * delayed termination (shutDown, followed by quiescence) in the - * presence of racing submissions. In all cases, the notion of the - * "final" unparked thread is an approximation, because new - * workers could be in the process of being constructed, which - * occasionally adds some extra unnecessary processing. - * - * Shutdown and Termination. A call to shutdownNow invokes - * tryTerminate to atomically set a mode bit. The calling thread, - * as well as every other worker thereafter terminating, helps - * terminate others by cancelling their unprocessed tasks, and - * interrupting other workers. Calls to non-abrupt shutdown() - * preface this by checking isQuiescent before triggering the - * "STOP" phase of termination. During termination, workers are - * stopped using all three of (often in parallel): releasing via - * ctl (method reactivate), interrupts, and cancelling tasks that - * will cause workers to not find work and exit. To support this, - * worker references not removed from the queues array during - * termination. It is possible for late thread creations to still - * be in progress after a quiescent termination reports terminated - * status, but they will also immediately terminate. To conform to - * ExecutorService invoke, invokeAll, and invokeAny specs, we must - * track pool status while waiting in ForkJoinTask.awaitDone, and - * interrupt interruptible callers on termination, while also - * avoiding cancelling other tasks that are normally completing - * during quiescent termination. This is tracked by recording - * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag - * argument to joining methods. + * is OK given, a secondary check (in awaitWork) needed to cover + * deactivation/signal races. Blocking and unblocking via + * park/unpark can cause serious slowdowns when tasks are rapidly + * but irregularly generated (which is often due to garbage + * collectors and other activities). One way to ameliorate is for + * workers to rescan multiple times, even when there are unlikely + * to be tasks. But this causes enough memory traffic and CAS + * contention to prefer using quieter short spinwaits in awaitWork + * and elsewhere. Those in awaitWork are set to small values that + * only cover near-miss scenarios for inactivate/activate races. + * Because idle workers are often not yet blocked (parked), we use + * the WorkQueue parker field to advertise that a waiter actually + * needs unparking upon signal. + * + * Quiescence. Workers scan looking for work, giving up when they + * don't find any, without being sure that none are available. + * However, some required functionality relies on consensus about + * quiescence (also termination, discussed below). The count + * fields in ctl allow accurate discovery of states in which all + * workers are idle. However, because external (asynchronous) + * submitters are not part of this vote, these mechanisms + * themselves do not guarantee that the pool is in a quiescent + * state with respect to methods isQuiescent, shutdown (which + * begins termination when quiescent), helpQuiesce, and indirectly + * others including tryCompensate. Method quiescent() is + * used in all of these contexts. It provides checks that all + * workers are idle and there are no submissions that they could + * poll if they were not idle, retrying on inconsistent reads of + * queues and using the runState seqLock to retry on queue array + * updates. (It also reports quiescence if the pool is + * terminating.) A true report means only that there was a moment + * at which quiescence held. False negatives are inevitable (for + * example when queues indices lag updates, as described above), + * which is accommodated when (tentatively) idle by scanning for + * work etc, and then re-invoking. This includes cases in which + * the final unparked thread (in awaitWork) uses quiescent() + * to check for tasks that could have been added during a race + * window that would not be accompanied by a signal, in which case + * re-activating itself (or any other worker) to rescan. Method + * helpQuiesce acts similarly but cannot rely on ctl counts to + * determine that all workers are inactive because the caller and + * any others executing helpQuiesce are not included in counts. + * + * Termination. A call to shutdownNow invokes tryTerminate to + * atomically set a runState mode bit. However, the process of + * termination is intrinsically non-atomic. The calling thread, as + * well as other workers thereafter terminating help cancel queued + * tasks and interrupt other workers. These actions race with + * unterminated workers. By default, workers check for + * termination only when accessing pool state. This may take a + * while but suffices for structured computational tasks. But not + * necessarily for others. Class InterruptibleTask (see below) + * further arranges runState checks before executing task bodies, + * and ensures interrupts while terminating. Even so, there are no + * guarantees after an abrupt shutdown that remaining tasks + * complete normally or exceptionally or are cancelled. + * Termination may fail to complete if running tasks ignore both + * task status and interrupts and/or produce more tasks after + * others that could cancel them have exited. * * Trimming workers. To release resources after periods of lack of * use, a worker starting to wait when the pool is quiescent will * time out and terminate if the pool has remained quiescent for - * period given by field keepAlive. + * period given by field keepAlive (default 60sec), which applies + * to the first timeout of a fully populated pool. Subsequent (or + * other) cases use delays such that, if still quiescent, all will + * be released before one additional keepAlive unit elapses. * * Joining Tasks * ============= * - * Normally, the first option when joining a task that is not done - * is to try to take it from local queue and run it. Otherwise, - * any of several actions may be taken when one worker is waiting - * to join a task stolen (or always held) by another. Because we - * are multiplexing many tasks on to a pool of workers, we can't - * always just let them block (as in Thread.join). We also cannot - * just reassign the joiner's run-time stack with another and - * replace it later, which would be a form of "continuation", that - * even if possible is not necessarily a good idea since we may - * need both an unblocked task and its continuation to progress. - * Instead we combine two tactics: - * - * Helping: Arranging for the joiner to execute some task that it - * could be running if the steal had not occurred. - * - * Compensating: Unless there are already enough live threads, - * method tryCompensate() may create or re-activate a spare - * thread to compensate for blocked joiners until they unblock. - * - * A third form (implemented via tryRemove) amounts to helping a - * hypothetical compensator: If we can readily tell that a - * possible action of a compensator is to steal and execute the - * task being joined, the joining thread can do so directly, - * without the need for a compensation thread; although with a - * possibility of reduced parallelism because of a transient gap - * in the queue array that stalls stealers. - * - * Other intermediate forms available for specific task types (for - * example helpAsyncBlocker) often avoid or postpone the need for - * blocking or compensation. - * - * The ManagedBlocker extension API can't use helping so relies - * only on compensation in method awaitBlocker. + * The "Join" part of ForkJoinPools consists of a set of + * mechanisms that sometimes or always (depending on the kind of + * task) avoid context switching or adding worker threads when one + * task would otherwise be blocked waiting for completion of + * another, basically, just by running that task or one of its + * subtasks if not already taken. These mechanics are disabled for + * InterruptibleTasks, that guarantee that callers do not executed + * submitted tasks. + * + * The basic structure of joining is an extended spin/block scheme + * in which workers check for task completions status between + * steps to find other work, until relevant pool state stabilizes + * enough to believe that no such tasks are available, at which + * point blocking. This is usually a good choice of when to block + * that would otherwise be harder to approximate. + * + * These forms of helping may increase stack space usage, but that + * space is bounded in tree/dag structured procedurally parallel + * designs to be no more than that if a task were executed only by + * the joining thread. This is arranged by associated task + * subclasses that also help detect and control the ways in which + * this may occur. * - * The algorithm in helpJoin entails a form of "linear helping". - * Each worker records (in field "source") a reference to the - * queue from which it last stole a task. The scan in method + * Normally, the first option when joining a task that is not done + * is to try to take it from the local queue and run it. Method + * tryRemoveAndExec tries to do so. For tasks with any form of + * subtasks that must be completed first, we try to locate these + * subtasks and run them as well. This is easy when local, but + * when stolen, steal-backs are restricted to the same rules as + * stealing (polling), which requires additional bookkeeping and + * scanning. This cost is still very much worthwhile because of + * its impact on task scheduling and resource control. + * + * The two methods for finding and executing subtasks vary in + * details. The algorithm in helpJoin entails a form of "linear + * helping". Each worker records (in field "source") the index of + * the internal queue from which it last stole a task. (Note: + * because chains cannot include even-numbered external queues, + * they are ignored, and 0 is an OK default.) The scan in method * helpJoin uses these markers to try to find a worker to help - * (i.e., steal back a task from and execute it) that could hasten - * completion of the actively joined task. Thus, the joiner - * executes a task that would be on its own local deque if the - * to-be-joined task had not been stolen. This is a conservative - * variant of the approach described in Wagner & Calder - * "Leapfrogging: a portable technique for implementing efficient - * futures" SIGPLAN Notices, 1993 + * (i.e., steal back a task from and execute it) that could make + * progress toward completion of the actively joined task. Thus, + * the joiner executes a task that would be on its own local deque + * if the to-be-joined task had not been stolen. This is a + * conservative variant of the approach described in Wagner & + * Calder "Leapfrogging: a portable technique for implementing + * efficient futures" SIGPLAN Notices, 1993 * (http://portal.acm.org/citation.cfm?id=155354). It differs * mainly in that we only record queues, not full dependency - * links. This requires a linear scan of the queues array to - * locate stealers, but isolates cost to when it is needed, rather - * than adding to per-task overhead. For CountedCompleters, the + * links. This requires a linear scan of the queues to locate + * stealers, but isolates cost to when it is needed, rather than + * adding to per-task overhead. For CountedCompleters, the * analogous method helpComplete doesn't need stealer-tracking, - * but requires a similar check of completion chains. + * but requires a similar (but simpler) check of completion + * chains. * * In either case, searches can fail to locate stealers when - * stalls delay recording sources. We avoid some of these cases by - * using snapshotted values of ctl as a check that the numbers of - * workers are not changing. But even when accurately identified, - * stealers might not ever produce a task that the joiner can in - * turn help with. So, compensation is tried upon failure to find - * tasks to run. + * stalls delay recording sources or issuing subtasks. We avoid + * some of these cases by using snapshotted values of ctl as a + * check that the numbers of workers are not changing, along with + * rescans to deal with contention and stalls. But even when + * accurately identified, stealers might not ever produce a task + * that the joiner can in turn help with. + * + * Related method helpAsyncBlocker does not directly rely on + * subtask structure, but instead avoids or postpones blocking of + * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by + * executing other asyncs that can be processed in any order. + * This is currently invoked only in non-join-based blocking + * contexts from classes CompletableFuture and + * SubmissionPublisher, that could be further generalized. + * + * When any of the above fail to avoid blocking, we rely on + * "compensation" -- an indirect form of context switching that + * either activates an existing worker to take the place of the + * blocked one, or expands the number of workers. * * Compensation does not by default aim to keep exactly the target * parallelism number of unblocked threads running at any given @@ -677,12 +767,28 @@ public class ForkJoinPool extends AbstractExecutorService { * compensations for any blocked join. However, in practice, the * vast majority of blockages are transient byproducts of GC and * other JVM or OS activities that are made worse by replacement - * when they cause longer-term oversubscription. Rather than - * impose arbitrary policies, we allow users to override the - * default of only adding threads upon apparent starvation. The - * compensation mechanism may also be bounded. Bounds for the - * commonPool better enable JVMs to cope with programming errors - * and abuse before running out of resources to do so. + * by causing longer-term oversubscription. These are inevitable + * without (unobtainably) perfect information about whether worker + * creation is actually necessary. False alarms are common enough + * to negatively impact performance, so compensation is by default + * attempted only when it appears possible that the pool could + * stall due to lack of any unblocked workers. However, we allow + * users to override defaults using the long form of the + * ForkJoinPool constructor. The compensation mechanism may also + * be bounded. Bounds for the commonPool better enable JVMs to + * cope with programming errors and abuse before running out of + * resources to do so. + * + * The ManagedBlocker extension API can't use helping so relies + * only on compensation in method awaitBlocker. This API was + * designed to highlight the uncertainty of compensation decisions + * by requiring implementation of method isReleasable to abort + * compensation during attempts to obtain a stable snapshot. But + * users now rely upon the fact that if isReleasable always + * returns false, the API can be used to obtain precautionary + * compensation, which is sometimes the only reasonable option + * when running unknown code in tasks; which is now supported more + * simply (see method beginCompensatedBlock). * * Common Pool * =========== @@ -691,32 +797,31 @@ public class ForkJoinPool extends AbstractExecutorService { * initialization. Since it (or any other created pool) need * never be used, we minimize initial construction overhead and * footprint to the setup of about a dozen fields, although with - * some System property parsing and with security processing that - * takes far longer than the actual construction when - * SecurityManagers are used or properties are set. The common - * pool is distinguished internally by having both a null - * workerNamePrefix and ISCOMMON config bit set, along with - * PRESET_SIZE set if parallelism was configured by system - * property. - * - * When external threads use ForkJoinTask.fork for the common - * pool, they can perform subtask processing (see helpComplete and - * related methods) upon joins. This caller-helps policy makes it - * sensible to set common pool parallelism level to one (or more) - * less than the total number of available cores, or even zero for - * pure caller-runs. For the sake of ExecutorService specs, we can - * only do this for tasks entered via fork, not submit. We track - * this using a task status bit (markPoolSubmission). In all - * other cases, external threads waiting for joins first check the - * common pool for their task, which fails quickly if the caller - * did not fork to common pool. + * some System property parsing and security processing that takes + * far longer than the actual construction when SecurityManagers + * are used or properties are set. The common pool is + * distinguished by having a null workerNamePrefix (which is an + * odd convention, but avoids the need to decode status in factory + * classes). It also has PRESET_SIZE config set if parallelism + * was configured by system property. + * + * When external threads use the common pool, they can perform + * subtask processing (see helpComplete and related methods) upon + * joins, unless they are submitted using ExecutorService + * submission methods, which implicitly disallow this. This + * caller-helps policy makes it sensible to set common pool + * parallelism level to one (or more) less than the total number + * of available cores, or even zero for pure caller-runs. External + * threads waiting for joins first check the common pool for their + * task, which fails quickly if the caller did not fork to common + * pool. * * Guarantees for common pool parallelism zero are limited to * tasks that are joined by their callers in a tree-structured * fashion or use CountedCompleters (as is true for jdk * parallelStreams). Support infiltrates several methods, - * including those that retry helping steps or spin until we are - * sure that none apply if there are no workers. + * including those that retry helping steps until we are sure that + * none apply if there are no workers. * * As a more appropriate default in managed environments, unless * overridden by system properties, we use workers of subclass @@ -727,55 +832,82 @@ public class ForkJoinPool extends AbstractExecutorService { * may be JVM-dependent and must access particular Thread class * fields to achieve this effect. * - * Interrupt handling - * ================== - * - * The framework is designed to manage task cancellation - * (ForkJoinTask.cancel) independently from the interrupt status - * of threads running tasks. (See the public ForkJoinTask - * documentation for rationale.) Interrupts are issued only in - * tryTerminate, when workers should be terminating and tasks - * should be cancelled anyway. Interrupts are cleared only when - * necessary to ensure that calls to LockSupport.park do not loop - * indefinitely (park returns immediately if the current thread is - * interrupted). For cases in which task bodies are specified or - * desired to interrupt upon cancellation, ForkJoinTask.cancel can - * be overridden to do so (as is done for invoke{Any,All}). + * InterruptibleTasks + * ==================== + * + * Regular ForkJoinTasks manage task cancellation (method cancel) + * independently from the interrupt status of threads running + * tasks. Interrupts are issued internally only while + * terminating, to wake up workers and cancel queued tasks. By + * default, interrupts are cleared only when necessary to ensure + * that calls to LockSupport.park do not loop indefinitely (park + * returns immediately if the current thread is interrupted). + * + * To comply with ExecutorService specs, we use subclasses of + * abstract class InterruptibleTask for tasks that require + * stronger interruption and cancellation guarantees. External + * submitters never run these tasks, even if in the common pool. + * InterruptibleTasks include a "runner" field (implemented + * similarly to FutureTask) to support cancel(true). Upon pool + * shutdown, runners are interrupted so they can cancel. Since + * external joining callers never run these tasks, they must await + * cancellation by others, which can occur along several different + * paths. + * + * Across these APIs, rules for reporting exceptions for tasks + * with results accessed via join() differ from those via get(), + * which differ from those invoked using pool submit methods by + * non-workers (which comply with Future.get() specs). Internal + * usages of ForkJoinTasks ignore interrupt status when executing + * or awaiting completion. Otherwise, reporting task results or + * exceptions is preferred to throwing InterruptedExecptions, + * which are in turn preferred to timeouts. Similarly, completion + * status is preferred to reporting cancellation. Cancellation is + * reported as an unchecked exception by join(), and by worker + * calls to get(), but is otherwise wrapped in a (checked) + * ExecutionException. + * + * Worker Threads cannot be VirtualThreads, as enforced by + * requiring ForkJoinWorkerThreads in factories. There are + * several constructions relying on this. However as of this + * writing, virtual thread bodies are by default run as some form + * of InterruptibleTask. * * Memory placement * ================ * * Performance is very sensitive to placement of instances of - * ForkJoinPool and WorkQueues and their queue arrays, as well the - * placement of their fields. Caches misses and contention due to - * false-sharing have been observed to slow down some programs by - * more than a factor of four. There is no perfect solution, in - * part because isolating more fields also generates more cache - * misses in more common cases (because some fields snd slots are - * usually read at the same time), and the main means of placing - * memory, the @Contended annotation provides only rough control - * (for good reason). We isolate the ForkJoinPool.ctl field as - * well the set of WorkQueue fields that otherwise cause the most - * false-sharing misses with respect to other fields. Also, - * ForkJoinPool fields are ordered such that fields less prone to - * contention effects are first, offsetting those that otherwise - * would be, while also reducing total footprint vs using - * multiple @Contended regions, which tends to slow down - * less-contended applications. These arrangements mainly reduce - * cache traffic by scanners, which speeds up finding tasks to - * run. Initial sizing and resizing of WorkQueue arrays is an - * even more delicate tradeoff because the best strategy may vary - * across garbage collectors. Small arrays are better for locality - * and reduce GC scan time, but large arrays reduce both direct - * false-sharing and indirect cases due to GC bookkeeping + * ForkJoinPool and WorkQueues and their queue arrays, as well as + * the placement of their fields. Caches misses and contention due + * to false-sharing have been observed to slow down some programs + * by more than a factor of four. Effects may vary across initial + * memory configuarations, applications, and different garbage + * collectors and GC settings, so there is no perfect solution. + * Too much isolation may generate more cache misses in common + * cases (because some fields snd slots are usually read at the + * same time). The @Contended annotation provides only rough + * control (for good reason). Similarly for relying on fields + * being placed in size-sorted declaration order. + * + * For class ForkJoinPool, it is usually more effective to order + * fields such that the most commonly accessed fields are unlikely + * to share cache lines with adjacent objects under JVM layout + * rules. For class WorkQueue, an embedded @Contended region + * segregates fields most heavily updated by owners from those + * most commonly read by stealers or other management. Initial + * sizing and resizing of WorkQueue arrays is an even more + * delicate tradeoff because the best strategy systematically + * varies across garbage collectors. Small arrays are better for + * locality and reduce GC scan time, but large arrays reduce both + * direct false-sharing and indirect cases due to GC bookkeeping * (cardmarks etc), and reduce the number of resizes, which are - * not especially fast because they require atomic transfers, and - * may cause other scanning workers to stall or give up. + * not especially fast because they require atomic transfers. * Currently, arrays are initialized to be fairly small but early * resizes rapidly increase size by more than a factor of two * until very large. (Maintenance note: any changes in fields, - * queues, or their uses must be accompanied by re-evaluation of - * these placement and sizing decisions.) + * queues, or their uses, or JVM layout policies, must be + * accompanied by re-evaluation of these placement and sizing + * decisions.) * * Style notes * =========== @@ -787,7 +919,9 @@ public class ForkJoinPool extends AbstractExecutorService { * other jdk components that require early parallelism. This can * be awkward and ugly, but also reflects the need to control * outcomes across the unusual cases that arise in very racy code - * with very few invariants. All fields are read into locals + * with very few invariants. All atomic task slot updates use + * Unsafe operations requiring offset positions, not indices, as + * computed by method slotOffset. All fields are read into locals * before use, and null-checked if they are references, even if * they can never be null under current usages. Usually, * computations (held in local variables) are defined as soon as @@ -818,7 +952,7 @@ public class ForkJoinPool extends AbstractExecutorService { * perform reasonably even when interpreted (not compiled). * * The order of declarations in this file is (with a few exceptions): - * (1) Static constants + * (1) Static configuration constants * (2) Static utility functions * (3) Nested (static) classes * (4) Fields, along with constants used when unpacking some of them @@ -832,15 +966,19 @@ public class ForkJoinPool extends AbstractExecutorService { * * The main sources of differences from previous version are: * - * * Use of Unsafe vs VarHandle, including re-instatement of some - * constructions from pre-VarHandle versions. - * * Reduced memory and signal contention, mainly by distinguishing - * failure cases. - * * Improved initialization, in part by preparing for possible - * removal of SecurityManager - * * Enable resizing (includes refactoring quiescence/termination) - * * Unification of most internal vs external operations; some made - * possible via use of WorkQueue.access, and POOLSUBMIT status in tasks. + * * New abstract class ForkJoinTask.InterruptibleTask ensures + * handling of tasks submitted under the ExecutorService + * API are consistent with specs. + * * Method quiescent() replaces previous quiescence-related + * checks, relying on versioning and sequence locking instead + * of ReentrantLock. + * * Termination processing now ensures that internal data + * structures are maintained consistently enough while stopping + * to interrupt all workers and cancel all tasks. It also uses a + * CountDownLatch instead of a Condition for termination because + * of lock change. + * * Many other changes to avoid performance regressions due + * to the above. */ // static configuration constants @@ -860,7 +998,7 @@ public class ForkJoinPool extends AbstractExecutorService { * The default value for common pool maxSpares. Overridable using * the "java.util.concurrent.ForkJoinPool.common.maximumSpares" * system property. The default value is far in excess of normal - * requirements, but also far short of MAX_CAP and typical OS + * requirements, but also far short of maximum capacity and typical OS * thread limits, so allows JVMs to catch misuse/abuse before * running out of resources needed to do so. */ @@ -872,26 +1010,42 @@ public class ForkJoinPool extends AbstractExecutorService { */ static final int INITIAL_QUEUE_CAPACITY = 1 << 6; - // Bounds - static final int SWIDTH = 16; // width of short - static final int SMASK = 0xffff; // short bits == max index - static final int MAX_CAP = 0x7fff; // max #workers - 1 - - // pool.runState and workQueue.access bits and sentinels - static final int STOP = 1 << 31; // must be negative - static final int SHUTDOWN = 1; - static final int TERMINATED = 2; - static final int PARKED = -1; // access value when parked - - // {pool, workQueue}.config bits - static final int FIFO = 1 << 16; // fifo queue or access mode - static final int SRC = 1 << 17; // set when stealable - static final int CLEAR_TLS = 1 << 18; // set for Innocuous workers - static final int TRIMMED = 1 << 19; // timed out while idle - static final int ISCOMMON = 1 << 20; // set for common pool - static final int PRESET_SIZE = 1 << 21; // size was set by property - - static final int UNCOMPENSATE = 1 << 16; // tryCompensate return + // conversions among short, int, long + static final int SMASK = 0xffff; // (unsigned) short bits + static final long LMASK = 0xffffffffL; // lower 32 bits of long + static final long UMASK = ~LMASK; // upper 32 bits + + // masks and sentinels for queue indices + static final int MAX_CAP = 0x7fff; // max # workers + static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id + static final int INVALID_ID = 0x4000; // unused external queue id + + // pool.runState bits + static final int STOP = 1 << 0; // terminating + static final int SHUTDOWN = 1 << 1; // terminate when quiescent + static final int TERMINATED = 1 << 2; // only set if STOP also set + static final int RS_LOCK = 1 << 3; // lowest seqlock bit + + // spin/sleep limits for runState locking and elsewhere + static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait + static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos + static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos + + // {pool, workQueue} config bits + static final int FIFO = 1 << 0; // fifo queue or access mode + static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers + static final int PRESET_SIZE = 1 << 2; // size was set by property + + // source history window packing used in scan() and runWorker() + static final long RESCAN = 1L << 63; // must be negative + static final long WMASK = ~(((long)SMASK) << 48); // id bits only + static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd + (((long)INVALID_ID) << 16)); // no 2nd + + // others + static final int DEREGISTERED = 1 << 31; // worker terminating + static final int UNCOMPENSATE = 1 << 16; // tryCompensate return + static final int IDLE = 1 << 16; // phase seqlock/version count /* * Bits and masks for ctl and bounds are packed with 4 16 bit subfields: @@ -911,9 +1065,6 @@ public class ForkJoinPool extends AbstractExecutorService { * Other updates of multiple subfields require CAS. */ - // Lower and upper word masks - static final long SP_MASK = 0xffffffffL; - static final long UC_MASK = ~SP_MASK; // Release counts static final int RC_SHIFT = 48; static final long RC_UNIT = 0x0001L << RC_SHIFT; @@ -922,13 +1073,27 @@ public class ForkJoinPool extends AbstractExecutorService { static final int TC_SHIFT = 32; static final long TC_UNIT = 0x0001L << TC_SHIFT; static final long TC_MASK = 0xffffL << TC_SHIFT; - // sp bits - static final int SS_SEQ = 1 << 16; // version count - static final int INACTIVE = 1 << 31; // phase bit when idle + + /* + * All atomic operations on task arrays (queues) use Unsafe + * operations that take array offsets versus indices, based on + * array base and shift constants established during static + * initialization. + */ + static final long ABASE; + static final int ASHIFT; // Static utilities /** + * Returns the array offset corresponding to the given index for + * Unsafe task queue operations + */ + static long slotOffset(int index) { + return ((long)index << ASHIFT) + ABASE; + } + + /** * If there is a security manager, makes sure caller has * permission to modify threads. */ @@ -1046,72 +1211,80 @@ public class ForkJoinPool extends AbstractExecutorService { * submission. See above for descriptions and algorithms. */ static final class WorkQueue { - int stackPred; // pool stack (ctl) predecessor link - int config; // index, mode, ORed with SRC after init - int base; // index of next slot for poll + // fields declared in order of their likely layout on most VMs + final ForkJoinWorkerThread owner; // null if shared + volatile Thread parker; // set when parking in awaitWork ForkJoinTask<?>[] array; // the queued tasks; power of 2 size - final ForkJoinWorkerThread owner; // owning thread or null if shared + int base; // index of next slot for poll + final int config; // mode bits // fields otherwise causing more unnecessary false-sharing cache misses @jdk.internal.vm.annotation.Contended("w") int top; // index of next slot for push @jdk.internal.vm.annotation.Contended("w") - volatile int access; // values 0, 1 (locked), PARKED, STOP + volatile int phase; // versioned active status @jdk.internal.vm.annotation.Contended("w") - volatile int phase; // versioned, negative if inactive + int stackPred; // pool stack (ctl) predecessor link @jdk.internal.vm.annotation.Contended("w") - volatile int source; // source queue id in topLevelExec + volatile int source; // source queue id (or DEREGISTERED) @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues // Support for atomic operations private static final Unsafe U; - private static final long ACCESS; private static final long PHASE; - private static final long ABASE; - private static final int ASHIFT; + private static final long BASE; + private static final long TOP; + private static final long SOURCE; + private static final long ARRAY; - static ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a, int i) { - return (ForkJoinTask<?>) - U.getAndSetReference(a, ((long)i << ASHIFT) + ABASE, null); + final void updateBase(int v) { + U.putIntVolatile(this, BASE, v); + } + final void updateTop(int v) { + U.putIntOpaque(this, TOP, v); } - static boolean casSlotToNull(ForkJoinTask<?>[] a, int i, - ForkJoinTask<?> c) { - return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE, - c, null); + final void forgetSource() { + U.putIntOpaque(this, SOURCE, 0); } - final void forcePhaseActive() { // clear sign bit - U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff); + final void updateArray(ForkJoinTask<?>[] a) { + U.getAndSetReference(this, ARRAY, a); } - final int getAndSetAccess(int v) { - return U.getAndSetInt(this, ACCESS, v); + final void unlockPhase() { + U.getAndAddInt(this, PHASE, IDLE); } - final void releaseAccess() { - U.putIntRelease(this, ACCESS, 0); + final boolean tryLockPhase() { // seqlock acquire + int p; + return (((p = phase) & IDLE) != 0 && + U.compareAndSetInt(this, PHASE, p, p + IDLE)); } /** - * Constructor. For owned queues, most fields are initialized + * Constructor. For internal queues, most fields are initialized * upon thread start in pool.registerWorker. */ - WorkQueue(ForkJoinWorkerThread owner, int config) { + WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, + boolean clearThreadLocals) { + if (clearThreadLocals) + cfg |= CLEAR_TLS; + this.config = cfg; + top = base = 1; + this.phase = id; this.owner = owner; - this.config = config; - base = top = 1; } /** * Returns an exportable index (used by ForkJoinWorkerThread). */ final int getPoolIndex() { - return (config & 0xffff) >>> 1; // ignore odd/even tag bit + return (phase & 0xffff) >>> 1; // ignore odd/even tag bit } /** * Returns the approximate number of tasks in the queue. */ final int queueSize() { - int unused = access; // for ordering effect + int unused = phase; // for ordering effect return Math.max(top - base, 0); // ignore transient negative } @@ -1119,42 +1292,50 @@ public class ForkJoinPool extends AbstractExecutorService { * Pushes a task. Called only by owner or if already locked * * @param task the task. Caller must ensure non-null. - * @param pool the pool. Must be non-null unless terminating. - * @param signalIfEmpty true if signal when pushing to empty queue - * @throws RejectedExecutionException if array cannot be resized + * @param pool the pool to signal if was previously empty, else null + * @param internal if caller owns this queue + * @throws RejectedExecutionException if array could not be resized */ final void push(ForkJoinTask<?> task, ForkJoinPool pool, - boolean signalIfEmpty) { - boolean resize = false; - int s = top++, b = base, cap, m; ForkJoinTask<?>[] a; - if ((a = array) != null && (cap = a.length) > 0) { - if ((m = (cap - 1)) == s - b) { - resize = true; // rapidly grow until large - int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1; - ForkJoinTask<?>[] newArray; + boolean internal) { + int s = top, b = base, cap, m, room; ForkJoinTask<?>[] a; + if ((a = array) == null || (cap = a.length) <= 0 || + (room = (m = cap - 1) - (s - b)) < 0) { // could not resize + if (!internal) + unlockPhase(); + throw new RejectedExecutionException("Queue capacity exceeded"); + } + top = s + 1; + long pos = slotOffset(m & s); + if (!internal) + U.putReference(a, pos, task); // inside lock + else + U.getAndSetReference(a, pos, task); // fully fenced + if (room == 0) { // resize for next time + int newCap; // rapidly grow until large + if ((newCap = (cap < 1 << 24) ? cap << 2 : cap << 1) > 0) { + ForkJoinTask<?>[] newArray = null; try { newArray = new ForkJoinTask<?>[newCap]; - } catch (Throwable ex) { - top = s; - access = 0; - throw new RejectedExecutionException( - "Queue capacity exceeded"); + } catch (OutOfMemoryError ex) { } - if (newCap > 0) { // always true - int newMask = newCap - 1, k = s; - do { // poll old, push to new - newArray[k-- & newMask] = task; - } while ((task = getAndClearSlot(a, k & m)) != null); + if (newArray != null) { // else throw on next push + int newMask = newCap - 1; // poll old, push to new + for (int k = s, j = cap; j > 0; --j, --k) { + if ((newArray[k & newMask] = + (ForkJoinTask<?>)U.getAndSetReference( + a, slotOffset(k & m), null)) == null) + break; // lost to pollers + } + updateArray(newArray); // fully fenced } - array = newArray; } - else - a[m & s] = task; - getAndSetAccess(0); // for memory effects if owned - if ((resize || (a[m & (s - 1)] == null && signalIfEmpty)) && - pool != null) - pool.signalWork(); } + if (!internal) + unlockPhase(); + if ((room == 0 || room >= m || a[m & (s - 1)] == null) && + pool != null) + pool.signalWork(); } /** @@ -1162,36 +1343,35 @@ public class ForkJoinPool extends AbstractExecutorService { * so acts as either local-pop or local-poll. Called only by owner. * @param fifo nonzero if FIFO mode */ - final ForkJoinTask<?> nextLocalTask(int fifo) { + private ForkJoinTask<?> nextLocalTask(int fifo) { ForkJoinTask<?> t = null; ForkJoinTask<?>[] a = array; - int p = top, s = p - 1, b = base, nb, cap; - if (p - b > 0 && a != null && (cap = a.length) > 0) { - do { + int b = base, p = top, cap; + if (a != null && (cap = a.length) > 0) { + for (int m = cap - 1, s, nb; p - b > 0; ) { if (fifo == 0 || (nb = b + 1) == p) { - if ((t = getAndClearSlot(a, (cap - 1) & s)) != null) - top = s; - break; // lost race for only task + if ((t = (ForkJoinTask<?>)U.getAndSetReference( + a, slotOffset(m & (s = p - 1)), null)) != null) + updateTop(s); // else lost race for only task + break; } - else if ((t = getAndClearSlot(a, (cap - 1) & b)) != null) { - base = nb; + if ((t = (ForkJoinTask<?>)U.getAndSetReference( + a, slotOffset(m & b), null)) != null) { + updateBase(nb); break; } - else { - while (b == (b = base)) { - U.loadFence(); - Thread.onSpinWait(); // spin to reduce memory traffic - } + while (b == (b = base)) { + U.loadFence(); + Thread.onSpinWait(); // spin to reduce memory traffic } - } while (p - b > 0); - U.storeStoreFence(); // for timely index updates + } } return t; } /** * Takes next task, if one exists, using configured mode. - * (Always owned, never called for Common pool.) + * (Always internal, never called for Common pool.) */ final ForkJoinTask<?> nextLocalTask() { return nextLocalTask(config & FIFO); @@ -1199,24 +1379,25 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Pops the given task only if it is at the current top. + * @param task the task. Caller must ensure non-null. + * @param internal if caller owns this queue */ - final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) { + final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) { + boolean taken = false; ForkJoinTask<?>[] a = array; - int p = top, s, cap, k; - if (task != null && base != p && a != null && (cap = a.length) > 0 && - a[k = (cap - 1) & (s = p - 1)] == task) { - if (owned || getAndSetAccess(1) == 0) { - if (top != p || a[k] != task || - getAndClearSlot(a, k) == null) - access = 0; - else { - top = s; - access = 0; - return true; - } + int p = top, s = p - 1, cap, k; + if (a != null && (cap = a.length) > 0 && + a[k = (cap - 1) & s] == task && + (internal || tryLockPhase())) { + if (top == p && + U.compareAndSetReference(a, slotOffset(k), task, null)) { + taken = true; + updateTop(s); } + if (!internal) + unlockPhase(); } - return false; + return taken; } /** @@ -1224,7 +1405,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ final ForkJoinTask<?> peek() { ForkJoinTask<?>[] a = array; - int cfg = config, p = top, b = base, cap; + int b = base, cfg = config, p = top, cap; if (p != b && a != null && (cap = a.length) > 0) { if ((cfg & FIFO) == 0) return a[(cap - 1) & (p - 1)]; @@ -1240,60 +1421,52 @@ public class ForkJoinPool extends AbstractExecutorService { } /** - * Polls for a task. Used only by non-owners in usually - * uncontended contexts. + * Polls for a task. Used only by non-owners. * * @param pool if nonnull, pool to signal if more tasks exist */ final ForkJoinTask<?> poll(ForkJoinPool pool) { - for (int b = base;;) { - int cap; ForkJoinTask<?>[] a; - if ((a = array) == null || (cap = a.length) <= 0) - break; // currently impossible - int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (b != (b = base)) // inconsistent - ; - else if (t != null && casSlotToNull(a, k, t)) { - base = nb; - U.storeFence(); - if (pool != null && a[nk] != null) - pool.signalWork(); // propagate - return t; + for (;;) { + ForkJoinTask<?>[] a = array; + int b = base, cap, k; + if (a == null || (cap = a.length) <= 0) + break; + ForkJoinTask<?> t = a[k = b & (cap - 1)]; + U.loadFence(); + if (base == b) { + Object o; + int nb = b + 1, nk = nb & (cap - 1); + if (t == null) + o = a[k]; + else if (t == (o = U.compareAndExchangeReference( + a, slotOffset(k), t, null))) { + updateBase(nb); + if (a[nk] != null && pool != null) + pool.signalWork(); // propagate + return t; + } + if (o == null && a[nk] == null && array == a && + (phase & (IDLE | 1)) != 0 && top - base <= 0) + break; // empty } - else if (array != a || a[k] != null) - ; // stale - else if (a[nk] == null && top - b <= 0) - break; // empty } return null; } /** - * Tries to poll next task in FIFO order, failing on - * contention or stalls. Used only by topLevelExec to repoll - * from the queue obtained from pool.scan. + * Tries to poll next task in FIFO order, failing without + * retries on contention or stalls. Used only by topLevelExec + * to repoll from the queue obtained from pool.scan. */ - final ForkJoinTask<?> tryPoll() { - int b = base, cap; ForkJoinTask<?>[] a; - if ((a = array) != null && (cap = a.length) > 0) { - for (;;) { - int k = (cap - 1) & b, nb = b + 1; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (b != (b = base)) - ; // inconsistent - else if (t != null) { - if (casSlotToNull(a, k, t)) { - base = nb; - U.storeStoreFence(); - return t; - } - break; // contended - } - else if (a[k] == null) - break; // empty or stalled + private ForkJoinTask<?> tryPoll() { + ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap, k; + if ((a = array) != null && (cap = a.length) > 0 && + (t = a[k = (b = base) & (cap - 1)]) != null) { + U.loadFence(); + if (base == b && + U.compareAndSetReference(a, slotOffset(k), t, null)) { + updateBase(b + 1); + return t; } } return null; @@ -1303,59 +1476,64 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Runs the given (stolen) task if nonnull, as well as - * remaining local tasks and/or others available from its - * source queue, if any. + * remaining local tasks and/or others available from the + * given queue, if any. */ - final void topLevelExec(ForkJoinTask<?> task, WorkQueue src) { - int cfg = config, fifo = cfg & FIFO, nstolen = 1; + final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) { + int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1; + if ((srcId & 1) != 0) // don't record external sources + source = srcId; + if ((cfg & CLEAR_TLS) != 0) + ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); while (task != null) { task.doExec(); - if ((task = nextLocalTask(fifo)) == null && - src != null && (task = src.tryPoll()) != null) + if ((task = nextLocalTask(fifo)) == null && src != null && + (task = src.tryPoll()) != null) ++nstolen; } - nsteals += nstolen; - source = 0; - if ((cfg & CLEAR_TLS) != 0) - ThreadLocalRandom.eraseThreadLocals(Thread.currentThread()); + nsteals = nstolen; + forgetSource(); } /** * Deep form of tryUnpush: Traverses from top and removes and - * runs task if present, shifting others to fill gap. - * @return task status if removed, else 0 + * runs task if present. */ - final int tryRemoveAndExec(ForkJoinTask<?> task, boolean owned) { + final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) { ForkJoinTask<?>[] a = array; - int p = top, s = p - 1, d = p - base, cap; - if (task != null && d > 0 && a != null && (cap = a.length) > 0) { - for (int m = cap - 1, i = s; ; --i) { - ForkJoinTask<?> t; int k; - if ((t = a[k = i & m]) == task) { - if (!owned && getAndSetAccess(1) != 0) - break; // fail if locked - else if (top != p || a[k] != task || - getAndClearSlot(a, k) == null) { - access = 0; - break; // missed - } - else { - if (i != s && i == base) - base = i + 1; // avoid shift - else { - for (int j = i; j != s;) // shift down - a[j & m] = getAndClearSlot(a, ++j & m); - top = s; + int b = base, p = top, s = p - 1, d = p - b, cap; + if (a != null && (cap = a.length) > 0) { + for (int m = cap - 1, i = s; d > 0; --i, --d) { + ForkJoinTask<?> t; int k; boolean taken; + if ((t = a[k = i & m]) == null) + break; + if (t == task) { + long pos = slotOffset(k); + if (!internal && !tryLockPhase()) + break; // fail if locked + if (taken = + (top == p && + U.compareAndSetReference(a, pos, task, null))) { + if (i == s) // act as pop + updateTop(s); + else if (i == base) // act as poll + updateBase(i + 1); + else { // swap with top + U.putReferenceVolatile( + a, pos, (ForkJoinTask<?>) + U.getAndSetReference( + a, slotOffset(s & m), null)); + updateTop(s); } - releaseAccess(); - return task.doExec(); } - } - else if (t == null || --d == 0) + if (!internal) + unlockPhase(); + if (taken) + task.doExec(); break; + } } } - return 0; } /** @@ -1364,39 +1542,45 @@ public class ForkJoinPool extends AbstractExecutorService { * * @param task root of computation * @param limit max runs, or zero for no limit - * @return task status on exit + * @return task status if known to be done */ - final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) { + final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) { int status = 0; if (task != null) { outer: for (;;) { - ForkJoinTask<?>[] a; ForkJoinTask<?> t; - int p, s, cap, k; - if ((status = task.status) < 0) - return status; - if ((a = array) == null || (cap = a.length) <= 0 || - (t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null || - !(t instanceof CountedCompleter)) + ForkJoinTask<?>[] a; ForkJoinTask<?> t; boolean taken; + int stat, p, s, cap, k; + if ((stat = task.status) < 0) { + status = stat; + break; + } + if ((a = array) == null || (cap = a.length) <= 0) + break; + if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null) + break; + if (!(t instanceof CountedCompleter)) break; - for (CountedCompleter<?> f = (CountedCompleter<?>)t;;) { + CountedCompleter<?> f = (CountedCompleter<?>)t; + for (int steps = cap;;) { // bound path if (f == task) break; - else if ((f = f.completer) == null) - break outer; // ineligible + if ((f = f.completer) == null || --steps == 0) + break outer; } - if (!owned && getAndSetAccess(1) != 0) - break; // fail if locked - if (top != p || a[k] != t || getAndClearSlot(a, k) == null) { - access = 0; - break; // missed - } - top = s; - releaseAccess(); + if (!internal && !tryLockPhase()) + break; + if (taken = + (top == p && + U.compareAndSetReference(a, slotOffset(k), t, null))) + updateTop(s); + if (!internal) + unlockPhase(); + if (!taken) + break; t.doExec(); if (limit != 0 && --limit == 0) break; } - status = task.status; } return status; } @@ -1408,67 +1592,50 @@ public class ForkJoinPool extends AbstractExecutorService { * @param blocker the blocker */ final void helpAsyncBlocker(ManagedBlocker blocker) { - if (blocker != null) { - for (;;) { - int b = base, cap; ForkJoinTask<?>[] a; - if ((a = array) == null || (cap = a.length) <= 0 || b == top) - break; - int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (base != b) - ; - else if (blocker.isReleasable()) - break; - else if (a[k] != t) - ; - else if (t != null) { - if (!(t instanceof CompletableFuture - .AsynchronousCompletionTask)) - break; - else if (casSlotToNull(a, k, t)) { - base = nb; - U.storeStoreFence(); - t.doExec(); - } - } - else if (a[nk] == null) + for (;;) { + ForkJoinTask<?>[] a; int b, cap, k; + if ((a = array) == null || (cap = a.length) <= 0) + break; + ForkJoinTask<?> t = a[k = (b = base) & (cap - 1)]; + U.loadFence(); + if (t == null) { + if (top - b <= 0) break; } + else if (!(t instanceof CompletableFuture + .AsynchronousCompletionTask)) + break; + if (blocker != null && blocker.isReleasable()) + break; + if (base == b && t != null && + U.compareAndSetReference(a, slotOffset(k), t, null)) { + updateBase(b + 1); + t.doExec(); + } } } // misc /** - * Returns true if owned by a worker thread and not known to be blocked. + * Returns true if internal and not known to be blocked. */ final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; - return (access != STOP && (wt = owner) != null && + return ((wt = owner) != null && (phase & IDLE) != 0 && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } - /** - * Called in constructors if ThreadLocals not preserved - */ - final void setClearThreadLocals() { - config |= CLEAR_TLS; - } - static { U = Unsafe.getUnsafe(); Class<WorkQueue> klass = WorkQueue.class; - ACCESS = U.objectFieldOffset(klass, "access"); PHASE = U.objectFieldOffset(klass, "phase"); - Class<ForkJoinTask[]> aklass = ForkJoinTask[].class; - ABASE = U.arrayBaseOffset(aklass); - int scale = U.arrayIndexScale(aklass); - ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); - if ((scale & (scale - 1)) != 0) - throw new Error("array index scale not a power of two"); + BASE = U.objectFieldOffset(klass, "base"); + TOP = U.objectFieldOffset(klass, "top"); + SOURCE = U.objectFieldOffset(klass, "source"); + ARRAY = U.objectFieldOffset(klass, "array"); } } @@ -1500,27 +1667,21 @@ public class ForkJoinPool extends AbstractExecutorService { */ static volatile RuntimePermission modifyThreadPermission; - - // Instance fields - volatile long stealCount; // collects worker nsteals - volatile long threadIds; // for worker thread names - final long keepAlive; // milliseconds before dropping if idle - final long bounds; // min, max threads packed as shorts - final int config; // static configuration bits - volatile int runState; // SHUTDOWN, STOP, TERMINATED bits - WorkQueue[] queues; // main registry - final ReentrantLock registrationLock; - Condition termination; // lazily constructed - final String workerNamePrefix; // null for common pool + // fields declared in order of their likely layout on most VMs + volatile CountDownLatch termination; // lazily constructed + final Predicate<? super ForkJoinPool> saturate; final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH - final Predicate<? super ForkJoinPool> saturate; final SharedThreadContainer container; - - @jdk.internal.vm.annotation.Contended("fjpctl") // segregate + final String workerNamePrefix; // null for common pool + WorkQueue[] queues; // main registry + final long keepAlive; // milliseconds before dropping if idle + final long config; // static configuration bits + volatile long stealCount; // collects worker nsteals + volatile long threadIds; // for worker thread names volatile long ctl; // main pool control - @jdk.internal.vm.annotation.Contended("fjpctl") // colocate int parallelism; // target number of workers + volatile int runState; // versioned, lockable // Support for atomic operations private static final Unsafe U; @@ -1528,6 +1689,7 @@ public class ForkJoinPool extends AbstractExecutorService { private static final long RUNSTATE; private static final long PARALLELISM; private static final long THREADIDS; + private static final long TERMINATION; private static final Object POOLIDS_BASE; private static final long POOLIDS; @@ -1540,9 +1702,6 @@ public class ForkJoinPool extends AbstractExecutorService { private long getAndAddCtl(long v) { return U.getAndAddLong(this, CTL, v); } - private int getAndBitwiseOrRunState(int v) { - return U.getAndBitwiseOrInt(this, RUNSTATE, v); - } private long incrementThreadIds() { return U.getAndAddLong(this, THREADIDS, 1L); } @@ -1555,6 +1714,51 @@ public class ForkJoinPool extends AbstractExecutorService { private int getParallelismOpaque() { return U.getIntOpaque(this, PARALLELISM); } + private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { + return (CountDownLatch) + U.compareAndExchangeReference(this, TERMINATION, null, x); + } + + // runState operations + + private int getAndBitwiseOrRunState(int v) { // for status bits + return U.getAndBitwiseOrInt(this, RUNSTATE, v); + } + private boolean casRunState(int c, int v) { + return U.compareAndSetInt(this, RUNSTATE, c, v); + } + private void unlockRunState() { // increment lock bit + U.getAndAddInt(this, RUNSTATE, RS_LOCK); + } + private int lockRunState() { // lock and return current state + int s, u; // locked when RS_LOCK set + if (((s = runState) & RS_LOCK) == 0 && casRunState(s, u = s + RS_LOCK)) + return u; + else + return spinLockRunState(); + } + private int spinLockRunState() { // spin/sleep + for (int waits = 0, s, u;;) { + if (((s = runState) & RS_LOCK) == 0) { + if (casRunState(s, u = s + RS_LOCK)) + return u; + waits = 0; + } else if (waits < SPIN_WAITS) { + ++waits; + Thread.onSpinWait(); + } else { + if (waits < MIN_SLEEP) + waits = MIN_SLEEP; + LockSupport.parkNanos(this, (long)waits); + if (waits < MAX_SLEEP) + waits <<= 1; + } + } + } + + static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask + return p != null && (p.runState & STOP) != 0; + } // Creating, registering, and deregistering workers @@ -1567,12 +1771,16 @@ public class ForkJoinPool extends AbstractExecutorService { */ private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; + SharedThreadContainer ctr = container; Throwable ex = null; ForkJoinWorkerThread wt = null; try { - if (runState >= 0 && // avoid construction if terminating + if ((runState & STOP) == 0 && // avoid construction if terminating fac != null && (wt = fac.newThread(this)) != null) { - container.start(wt); + if (ctr != null) + ctr.start(wt); + else + wt.start(); return true; } } catch (Throwable rex) { @@ -1594,49 +1802,49 @@ public class ForkJoinPool extends AbstractExecutorService { } /** - * Finishes initializing and records owned queue. + * Finishes initializing and records internal queue. * * @param w caller's WorkQueue */ final void registerWorker(WorkQueue w) { - ThreadLocalRandom.localInit(); - int seed = ThreadLocalRandom.getProbe(); - ReentrantLock lock = registrationLock; - int cfg = config & FIFO; - if (w != null && lock != null) { + if (w != null) { w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; - cfg |= w.config | SRC; - w.stackPred = seed; - int id = (seed << 1) | 1; // initial index guess - lock.lock(); + ThreadLocalRandom.localInit(); + int seed = w.stackPred = ThreadLocalRandom.getProbe(); + int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag + int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan + int stop = lockRunState() & STOP; try { - WorkQueue[] qs; int n; // find queue index - if ((qs = queues) != null && (n = qs.length) > 0) { - int k = n, m = n - 1; - for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2); - if (k == 0) - id = n | 1; // resize below - w.phase = w.config = id | cfg; // now publishable - + WorkQueue[] qs; int n; + if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0) { + for (int k = n, m = n - 1; ; id += 2) { + if (qs[id &= m] == null) + break; + if ((k -= 2) <= 0) { + id |= n; + break; + } + } + w.phase = id | phaseSeq; // now publishable if (id < n) qs[id] = w; - else { // expand array + else { // expand int an = n << 1, am = an - 1; WorkQueue[] as = new WorkQueue[an]; as[id & am] = w; for (int j = 1; j < n; j += 2) as[j] = qs[j]; for (int j = 0; j < n; j += 2) { - WorkQueue q; - if ((q = qs[j]) != null) // shared queues may move - as[q.config & am] = q; + WorkQueue q; // shared queues may move + if ((q = qs[j]) != null) + as[q.phase & EXTERNAL_ID_MASK & am] = q; } - U.storeFence(); // fill before publish + U.storeFence(); // fill before publish queues = as; } } } finally { - lock.unlock(); + unlockRunState(); } } } @@ -1651,146 +1859,159 @@ public class ForkJoinPool extends AbstractExecutorService { * @param ex the exception causing failure, or null if none */ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { - WorkQueue w = (wt == null) ? null : wt.workQueue; - int cfg = (w == null) ? 0 : w.config; + WorkQueue w = null; + int src = 0, phase = 0; + boolean replaceable = false; + if (wt != null && (w = wt.workQueue) != null) { + phase = w.phase; + if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout + w.source = DEREGISTERED; + if (phase != 0) { // else failed to start + replaceable = true; + if ((phase & IDLE) != 0) + reactivate(w); // pool stopped before released + } + } + } long c = ctl; - if ((cfg & TRIMMED) == 0) // decrement counts + if (src != DEREGISTERED) // decrement counts do {} while (c != (c = compareAndExchangeCtl( c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | - (SP_MASK & c))))); - else if ((int)c == 0) // was dropped on timeout - cfg &= ~SRC; // suppress signal if last - if (!tryTerminate(false, false) && w != null) { - ReentrantLock lock; WorkQueue[] qs; int n, i; - long ns = w.nsteals & 0xffffffffL; - if ((lock = registrationLock) != null) { - lock.lock(); // remove index unless terminating - if ((qs = queues) != null && (n = qs.length) > 0 && - qs[i = cfg & (n - 1)] == w) - qs[i] = null; - stealCount += ns; // accumulate steals - lock.unlock(); + (LMASK & c))))); + else if ((int)c == 0) // was dropped on timeout + replaceable = false; + if (w != null) { // cancel remaining tasks + for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) { + try { + t.cancel(false); + } catch (Throwable ignore) { + } } - if ((cfg & SRC) != 0) - signalWork(); // possibly replace worker - } - if (ex != null) { - if (w != null) { - w.access = STOP; // cancel tasks - for (ForkJoinTask<?> t; (t = w.nextLocalTask(0)) != null; ) - ForkJoinTask.cancelIgnoringExceptions(t); + } + if ((tryTerminate(false, false) & STOP) == 0 && w != null) { + WorkQueue[] qs; int n, i; // remove index unless terminating + long ns = w.nsteals & 0xffffffffL; + int stop = lockRunState() & STOP; + if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 && + qs[i = phase & SMASK & (n - 1)] == w) { + qs[i] = null; + stealCount += ns; // accumulate steals } - ForkJoinTask.rethrow(ex); + unlockRunState(); } + if ((runState & STOP) == 0 && replaceable) + signalWork(); // may replace unless trimmed or uninitialized + if (ex != null) + ForkJoinTask.rethrow(ex); } - /* + /** * Releases an idle worker, or creates one if not enough exist. */ final void signalWork() { - int pc = parallelism, n; - long c = ctl; - WorkQueue[] qs = queues; - if ((short)(c >>> RC_SHIFT) < pc && qs != null && (n = qs.length) > 0) { - for (;;) { - boolean create = false; - int sp = (int)c & ~INACTIVE; - WorkQueue v = qs[sp & (n - 1)]; - int deficit = pc - (short)(c >>> TC_SHIFT); - long ac = (c + RC_UNIT) & RC_MASK, nc; - if (sp != 0 && v != null) - nc = (v.stackPred & SP_MASK) | (c & TC_MASK); - else if (deficit <= 0) - break; - else { - create = true; - nc = ((c + TC_UNIT) & TC_MASK); - } - if (c == (c = compareAndExchangeCtl(c, nc | ac))) { - if (create) - createWorker(); - else { - Thread owner = v.owner; - v.phase = sp; - if (v.access == PARKED) - LockSupport.unpark(owner); - } + int pc = parallelism; + for (long c = ctl;;) { + WorkQueue[] qs = queues; + long ac = (c + RC_UNIT) & RC_MASK, nc; + int sp = (int)c, i = sp & SMASK; + if (qs == null || qs.length <= i) + break; + WorkQueue w = qs[i], v = null; + if (sp == 0) { + if ((short)(c >>> TC_SHIFT) >= pc) break; - } + nc = ((c + TC_UNIT) & TC_MASK); } - } - } - - /** - * Reactivates any idle worker, if one exists. - * - * @return the signalled worker, or null if none - */ - private WorkQueue reactivate() { - WorkQueue[] qs; int n; - long c = ctl; - if ((qs = queues) != null && (n = qs.length) > 0) { - for (;;) { - int sp = (int)c & ~INACTIVE; - WorkQueue v = qs[sp & (n - 1)]; - long ac = UC_MASK & (c + RC_UNIT); - if (sp == 0 || v == null) - break; - if (c == (c = compareAndExchangeCtl( - c, (v.stackPred & SP_MASK) | ac))) { - Thread owner = v.owner; + else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null) + break; + else + nc = (v.stackPred & LMASK) | (c & TC_MASK); + if (c == (c = compareAndExchangeCtl(c, nc | ac))) { + if (v == null) + createWorker(); + else { + Thread t; v.phase = sp; - if (v.access == PARKED) - LockSupport.unpark(owner); - return v; + if ((t = v.parker) != null) + U.unpark(t); } + break; } } - return null; } /** - * Tries to deactivate worker w; called only on idle timeout. + * Reactivates the given worker, and possibly interrupts others if + * not top of ctl stack. Called only during shutdown to ensure release + * on termination. */ - private boolean tryTrim(WorkQueue w) { - if (w != null) { - int pred = w.stackPred, cfg = w.config | TRIMMED; - long c = ctl; - int sp = (int)c & ~INACTIVE; - if ((sp & SMASK) == (cfg & SMASK) && - compareAndSetCtl(c, ((pred & SP_MASK) | - (UC_MASK & (c - TC_UNIT))))) { - w.config = cfg; // add sentinel for deregisterWorker - w.phase = sp; - return true; + private void reactivate(WorkQueue w) { + for (long c = ctl;;) { + WorkQueue[] qs; WorkQueue v; int sp, i; + if ((qs = queues) == null || (sp = (int)c) == 0 || + qs.length <= (i = sp & SMASK) || (v = qs[i]) == null || + (v != w && w != null && (w.phase & IDLE) == 0)) + break; + if (c == (c = compareAndExchangeCtl( + c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | + (v.stackPred & LMASK))))) { + Thread t; + v.phase = sp; + if ((t = v.parker) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + if (v == w) + break; } } - return false; } /** - * Returns true if any queue is detectably nonempty. Accurate - * only when workers are quiescent; else conservatively - * approximate. - * @param submissionsOnly if true, only check submission queues - */ - private boolean hasTasks(boolean submissionsOnly) { - int step = submissionsOnly ? 2 : 1; - for (int checkSum = 0;;) { // repeat until stable (normally twice) - U.loadFence(); - WorkQueue[] qs = queues; - int n = (qs == null) ? 0 : qs.length, sum = 0; - for (int i = 0; i < n; i += step) { - WorkQueue q; int s; - if ((q = qs[i]) != null) { - if (q.access > 0 || (s = q.top) != q.base) - return true; - sum += (s << 16) + i + 1; + * Internal version of isQuiescent and related functionality. + * @return true if terminating or all workers are inactive and + * submission queues are empty and unlocked; if so, setting STOP + * if shutdown is enabled + */ + private boolean quiescent() { + outer: for (;;) { + long phaseSum = 0L; + boolean swept = false; + for (int e, prevRunState = 0; ; prevRunState = e) { + long c = ctl; + if (((e = runState) & STOP) != 0) + return true; // terminating + else if ((c & RC_MASK) > 0L) + return false; // at least one active + else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) { + long sum = c; + WorkQueue[] qs = queues; WorkQueue q; + int n = (qs == null) ? 0 : qs.length; + for (int i = 0; i < n; ++i) { // scan queues + if ((q = qs[i]) != null) { + int p = q.phase, s = q.top, b = q.base; + sum += (p & 0xffffffffL) | ((long)b << 32); + if ((p & IDLE) == 0 || s - b > 0) { + if ((i & 1) == 0 && compareAndSetCtl(c, c)) + signalWork(); // ensure live + return false; + } + } + } + swept = (phaseSum == (phaseSum = sum)); } + else if (compareAndSetCtl(c, c) && // confirm + casRunState(e, (e & SHUTDOWN) != 0 ? e | STOP : e)) { + if ((e & SHUTDOWN) != 0) // enable termination + interruptAll(); + return true; + } + else + break; // restart } - if (checkSum == (checkSum = sum)) - return false; } } @@ -1801,129 +2022,139 @@ public class ForkJoinPool extends AbstractExecutorService { * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { - if (w != null) { // skip on failed init - int r = w.stackPred, src = 0; // use seed from registerWorker - do { - r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - } while ((src = scan(w, src, r)) >= 0 || - (src = awaitWork(w)) == 0); - w.access = STOP; // record normal termination + if (w != null) { + int phase = w.phase, r = w.stackPred; // seed from registerWorker + for (long window = NO_HISTORY | (r >>> 16);;) { + r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + if ((runState & STOP) != 0) // terminating + break; + if (window == (window = scan(w, window & WMASK, r)) && + window >= 0L && phase != (phase = awaitWork(w, phase))) { + if ((phase & IDLE) != 0) + break; // worker exit + window = NO_HISTORY | (window & SMASK); // clear history + } + } } } /** * Scans for and if found executes top-level tasks: Tries to poll - * each queue starting at a random index with random stride, - * returning source id or retry indicator. + * each queue starting at initial index with random stride, + * returning next scan window and retry indicator. * * @param w caller's WorkQueue - * @param prevSrc the two previous queues (if nonzero) stolen from in current phase, packed as int + * @param window up to three queue indices * @param r random seed - * @return the next prevSrc value to use, or negative if none found + * @return the next window to use, with RESCAN set for rescan */ - private int scan(WorkQueue w, int prevSrc, int r) { + private long scan(WorkQueue w, long window, int r) { WorkQueue[] qs = queues; - int n = (w == null || qs == null) ? 0 : qs.length; - for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) { + int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1; + outer: for (int i = (short)window, l = n; l > 0; --l, i += step) { int j, cap; WorkQueue q; ForkJoinTask<?>[] a; - if ((q = qs[j = r & (n - 1)]) != null && + if ((q = qs[j = i & SMASK & (n - 1)]) != null && (a = q.array) != null && (cap = a.length) > 0) { - int src = j | SRC, b = q.base; - int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (q.base != b) // inconsistent - return prevSrc; - else if (t != null && WorkQueue.casSlotToNull(a, k, t)) { - q.base = nb; - w.source = src; - if (src + (src << SWIDTH) != prevSrc && - q.base == nb && a[nk] != null) - signalWork(); // propagate at most twice/run - w.topLevelExec(t, q); - return src + (prevSrc << SWIDTH); - } - else if (q.array != a || a[k] != null || a[nk] != null) - return prevSrc; // revisit - } - } - return -1; - } - - /** - * Advances phase, enqueues, and awaits signal or termination. - * - * @return negative if terminated, else 0 - */ - private int awaitWork(WorkQueue w) { - if (w == null) - return -1; // currently impossible - int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase - boolean idle = false; // true if possibly quiescent - if (runState < 0) - return -1; // terminating - long sp = p & SP_MASK, pc = ctl, qc; - w.phase = p | INACTIVE; - do { // enqueue - w.stackPred = (int)pc; // set ctl stack link - } while (pc != (pc = compareAndExchangeCtl( - pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp))); - if ((qc & RC_MASK) <= 0L) { - if (hasTasks(true) && (w.phase >= 0 || reactivate() == w)) - return 0; // check for stragglers - if (runState != 0 && tryTerminate(false, false)) - return -1; // quiescent termination - idle = true; - } - WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal - int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf; - while ((p = w.phase) < 0 && --spins > 0) - Thread.onSpinWait(); - if (p < 0) { - long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L; - LockSupport.setCurrentBlocker(this); - for (;;) { // await signal or termination - if (runState < 0) - return -1; - w.access = PARKED; // enable unpark - if (w.phase < 0) { - if (idle) - LockSupport.parkUntil(deadline); - else - LockSupport.park(); - } - w.access = 0; // disable unpark - if (w.phase >= 0) { - LockSupport.setCurrentBlocker(null); - break; - } - Thread.interrupted(); // clear status for next park - if (idle) { // check for idle timeout - if (deadline - System.currentTimeMillis() < TIMEOUT_SLOP) { - if (tryTrim(w)) - return -1; - else // not at head; restart timer - deadline += keepAlive; + for (;;) { + int b, k; Object o; + ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; + U.loadFence(); // re-read b and t + if (q.base == b) { // else inconsistent; retry + int nb = b + 1, nk = nb & (cap - 1); + if (t == null) { + if (a[k] == null) { // revisit if another task + if (window >= 0L && a[nk] != null) + window |= RESCAN; + break; + } + } + else if (t == (o = U.compareAndExchangeReference( + a, slotOffset(k), t, null))) { + q.updateBase(nb); + long pw = window, nw = ((pw << 16) | j) & WMASK; + window = nw | RESCAN; + if ((nw != pw || (short)(nw >>> 32) != j) && + a[nk] != null) + signalWork(); // limit propagation + if (w != null) // always true + w.topLevelExec(t, q, j); + break outer; + } + else if (o == null) // contended + break; // retried unless newly active } } } } - return 0; + return window; } /** - * Non-overridable version of isQuiescent. Returns true if - * quiescent or already terminating. + * Tries to inactivate, and if successful, awaits signal or termination. + * + * @param w the worker (may be null if already terminated) + * @param p current phase + * @return current phase, with IDLE set if worker should exit */ - private boolean canStop() { - long c = ctl; - do { - if (runState < 0) - break; - if ((c & RC_MASK) > 0L || hasTasks(false)) - return false; - } while (c != (c = ctl)); // validate - return true; + private int awaitWork(WorkQueue w, int p) { + if (w != null) { + int idlePhase = p + IDLE, nextPhase = p + (IDLE << 1); + long pc = ctl, qc = (nextPhase & LMASK) | ((pc - RC_UNIT) & UMASK); + w.stackPred = (int)pc; // set ctl stack link + w.phase = idlePhase; // try to inactivate + if (!compareAndSetCtl(pc, qc)) // contended enque + return w.phase = p; // back out + int ac = (short)(qc >>> RC_SHIFT); + boolean quiescent = (ac <= 0 && quiescent()); + if ((runState & STOP) != 0) + return idlePhase; + int spins = ac + ((((int)(qc >>> TC_SHIFT)) & SMASK) << 1); + while ((p = w.phase) == idlePhase && --spins > 0) + Thread.onSpinWait(); // spin for approx #accesses to signal + if (p == idlePhase) { + long deadline = (!quiescent ? 0L : // timeout for trim + System.currentTimeMillis() + keepAlive); + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + for (int i = 0; i < n; ++i) { // recheck queues + WorkQueue q; ForkJoinTask<?>[] a; int cap; + if ((q = qs[i]) != null && + (a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null && + ctl == qc && compareAndSetCtl(qc, pc)) { + w.phase = (int)qc; // release + break; + } + } + if ((p = w.phase) == idlePhase) { // emulate LockSupport.park + LockSupport.setCurrentBlocker(this); + w.parker = Thread.currentThread(); + for (;;) { + if ((runState & STOP) != 0 || (p = w.phase) != idlePhase) + break; + U.park(quiescent, deadline); + if ((p = w.phase) != idlePhase || (runState & STOP) != 0) + break; + Thread.interrupted(); // clear for next park + if (quiescent && TIMEOUT_SLOP > + deadline - System.currentTimeMillis()) { + long sp = w.stackPred & LMASK; + long c = ctl, nc = sp | (UMASK & (c - TC_UNIT)); + if (((int)c & SMASK) == (idlePhase & SMASK) && + compareAndSetCtl(c, nc)) { + w.source = DEREGISTERED; + w.phase = (int)c; + break; + } + deadline += keepAlive; // not head; reset timer + } + } + w.parker = null; + LockSupport.setCurrentBlocker(null); + } + } + } + return p; } /** @@ -1934,16 +2165,18 @@ public class ForkJoinPool extends AbstractExecutorService { * @param submissionsOnly if true, only scan submission queues */ private ForkJoinTask<?> pollScan(boolean submissionsOnly) { - int r = ThreadLocalRandom.nextSecondarySeed(); - if (submissionsOnly) // even indices only - r &= ~1; - int step = (submissionsOnly) ? 2 : 1; - WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t; - if (runState >= 0 && (qs = queues) != null && (n = qs.length) > 0) { - for (int i = n; i > 0; i -= step, r += step) { - if ((q = qs[r & (n - 1)]) != null && - (t = q.poll(this)) != null) - return t; + if ((runState & STOP) == 0) { + WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t; + int r = ThreadLocalRandom.nextSecondarySeed(); + if (submissionsOnly) // even indices only + r &= ~1; + int step = (submissionsOnly) ? 2 : 1; + if ((qs = queues) != null && (n = qs.length) > 0) { + for (int i = n; i > 0; i -= step, r += step) { + if ((q = qs[r & (n - 1)]) != null && + (t = q.poll(this)) != null) + return t; + } } } return null; @@ -1958,47 +2191,48 @@ public class ForkJoinPool extends AbstractExecutorService { * unblocked. * * @param c incoming ctl value - * @param canSaturate to override saturate predicate * @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry */ - private int tryCompensate(long c, boolean canSaturate) { + private int tryCompensate(long c) { Predicate<? super ForkJoinPool> sat; - long b = bounds; // unpack fields - int pc = parallelism; - int minActive = (short)(b & SMASK), - maxTotal = (short)(b >>> SWIDTH) + pc, + long b = config; + int pc = parallelism, // unpack fields + minActive = (short)(b >>> RC_SHIFT), + maxTotal = (short)(b >>> TC_SHIFT) + pc, active = (short)(c >>> RC_SHIFT), total = (short)(c >>> TC_SHIFT), - sp = (int)c & ~INACTIVE; - if (sp != 0 && active <= pc) { // activate idle worker - WorkQueue[] qs; WorkQueue v; int i; - if (ctl == c && (qs = queues) != null && - qs.length > (i = sp & SMASK) && (v = qs[i]) != null) { - long nc = (v.stackPred & SP_MASK) | (UC_MASK & c); - if (compareAndSetCtl(c, nc)) { - v.phase = sp; - LockSupport.unpark(v.owner); - return UNCOMPENSATE; - } + sp = (int)c, + stat = -1; // default retry return + if (sp != 0 && active <= pc) { // activate idle worker + WorkQueue[] qs; WorkQueue v; int i; Thread t; + if ((qs = queues) != null && qs.length > (i = sp & SMASK) && + (v = qs[i]) != null && + compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { + v.phase = sp; + if ((t = v.parker) != null) + U.unpark(t); + stat = UNCOMPENSATE; } - return -1; // retry } else if (active > minActive && total >= pc) { // reduce active workers - long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); - return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1; + if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) + stat = UNCOMPENSATE; } - else if (total < maxTotal && total < MAX_CAP) { // expand pool + else if (total < maxTotal && total < MAX_CAP) { // try to expand pool long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - return (!compareAndSetCtl(c, nc) ? -1 : - !createWorker() ? 0 : UNCOMPENSATE); + if ((runState & STOP) != 0) // terminating + stat = 0; + else if (compareAndSetCtl(c, nc)) + stat = createWorker() ? UNCOMPENSATE : 0; } else if (!compareAndSetCtl(c, c)) // validate - return -1; - else if (canSaturate || ((sat = saturate) != null && sat.test(this))) - return 0; + ; + else if ((sat = saturate) != null && sat.test(this)) + stat = 0; else throw new RejectedExecutionException( "Thread limit exceeded replacing blocked worker"); + return stat; } /** @@ -2016,234 +2250,256 @@ public class ForkJoinPool extends AbstractExecutorService { * * @param task the task * @param w caller's WorkQueue - * @param timed true if this is a timed join + * @param internal true if w is owned by a ForkJoinWorkerThread * @return task status on exit, or UNCOMPENSATE for compensated blocking */ - final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean timed) { - if (w == null || task == null) - return 0; - int wsrc = w.source, wid = (w.config & SMASK) | SRC, r = wid + 2; - long sctl = 0L; // track stability - for (boolean rescan = true;;) { - int s; WorkQueue[] qs; - if ((s = task.status) < 0) - return s; - if (!rescan && sctl == (sctl = ctl)) { - if (runState < 0) - return 0; - if ((s = tryCompensate(sctl, timed)) >= 0) - return s; // block - } - rescan = false; - int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1; - scan: for (int i = n >>> 1; i > 0; --i, r += 2) { - int j, cap; WorkQueue q; ForkJoinTask<?>[] a; - if ((q = qs[j = r & m]) != null && (a = q.array) != null && - (cap = a.length) > 0) { - for (int src = j | SRC;;) { - int sq = q.source, b = q.base; - int k = (cap - 1) & b, nb = b + 1; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - boolean eligible = true; // check steal chain - for (int d = n, v = sq;;) { // may be cyclic; bound - WorkQueue p; - if (v == wid) - break; - if (v == 0 || --d == 0 || (p = qs[v & m]) == null) { - eligible = false; + + final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) { + if (w != null) + w.tryRemoveAndExec(task, internal); + int s = 0; + if (task != null && (s = task.status) >= 0 && internal && w != null) { + int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source; + long sctl = 0L; // track stability + outer: for (boolean rescan = true;;) { + if ((s = task.status) < 0) + break; + if (!rescan) { + if ((runState & STOP) != 0) + break; + if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0) + break; + } + rescan = false; + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + scan: for (int l = n >>> 1; l > 0; --l, r += 2) { + int j; WorkQueue q; + if ((q = qs[j = r & SMASK & (n - 1)]) != null) { + for (;;) { + int sq = q.source, b, cap, k; ForkJoinTask<?>[] a; + if ((a = q.array) == null || (cap = a.length) <= 0) break; + ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; + U.loadFence(); + boolean eligible = false; + if (t == task) + eligible = true; + else if (t != null) { // check steal chain + for (int v = sq, d = cap;;) { + WorkQueue p; + if (v == wid) { + eligible = true; + break; + } + if ((v & 1) == 0 || // external or none + --d < 0 || // bound depth + (p = qs[v & (n - 1)]) == null) + break; + v = p.source; + } } - v = p.source; - } - if (q.source != sq || q.base != b) - ; // stale - else if ((s = task.status) < 0) - return s; // recheck before taking - else if (t == null) { - if (a[k] == null) { - if (!rescan && eligible && - (q.array != a || q.top != b)) - rescan = true; // resized or stalled - break; + if ((s = task.status) < 0) + break outer; // validate + if (q.source == sq && q.base == b && a[k] == t) { + int nb = b + 1, nk = nb & (cap - 1); + if (!eligible) { // revisit if nonempty + if (!rescan && t == null && + (a[nk] != null || q.top - b > 0)) + rescan = true; + break; + } + if (U.compareAndSetReference( + a, slotOffset(k), t, null)) { + q.updateBase(nb); + w.source = j; + t.doExec(); + w.source = wsrc; + rescan = true; // restart at index r + break scan; + } } } - else if (t != task && !eligible) - break; - else if (WorkQueue.casSlotToNull(a, k, t)) { - q.base = nb; - w.source = src; - t.doExec(); - w.source = wsrc; - rescan = true; - break scan; - } } } } } + return s; } /** * Version of helpJoin for CountedCompleters. * - * @param task the task + * @param task root of computation (only called when a CountedCompleter) * @param w caller's WorkQueue - * @param owned true if w is owned by a ForkJoinWorkerThread - * @param timed true if this is a timed join + * @param internal true if w is owned by a ForkJoinWorkerThread * @return task status on exit, or UNCOMPENSATE for compensated blocking */ - final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned, - boolean timed) { - if (w == null || task == null) - return 0; - int wsrc = w.source, r = w.config; - long sctl = 0L; // track stability - for (boolean rescan = true;;) { - int s; WorkQueue[] qs; - if ((s = w.helpComplete(task, owned, 0)) < 0) - return s; - if (!rescan && sctl == (sctl = ctl)) { - if (!owned || runState < 0) - return 0; - if ((s = tryCompensate(sctl, timed)) >= 0) - return s; - } - rescan = false; - int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1; - scan: for (int i = n; i > 0; --i, ++r) { - int j, cap; WorkQueue q; ForkJoinTask<?>[] a; - if ((q = qs[j = r & m]) != null && (a = q.array) != null && - (cap = a.length) > 0) { - poll: for (int src = j | SRC, b = q.base;;) { - int k = (cap - 1) & b, nb = b + 1; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (b != (b = q.base)) - ; // stale - else if ((s = task.status) < 0) - return s; // recheck before taking - else if (t == null) { - if (a[k] == null) { - if (!rescan && // resized or stalled - (q.array != a || q.top != b)) - rescan = true; + final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) { + int s = 0; + if (task != null && (s = task.status) >= 0 && w != null) { + int r = w.phase + 1; // for indexing + long sctl = 0L; // track stability + outer: for (boolean rescan = true, locals = true;;) { + if (locals && (s = w.helpComplete(task, internal, 0)) < 0) + break; + if ((s = task.status) < 0) + break; + if (!rescan) { + if ((runState & STOP) != 0) + break; + if (sctl == (sctl = ctl) && + (!internal || (s = tryCompensate(sctl)) >= 0)) + break; + } + rescan = locals = false; + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + scan: for (int l = n; l > 0; --l, ++r) { + int j; WorkQueue q; + if ((q = qs[j = r & SMASK & (n - 1)]) != null) { + for (;;) { + ForkJoinTask<?>[] a; int b, cap, k; + if ((a = q.array) == null || (cap = a.length) <= 0) break; + ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; + U.loadFence(); + boolean eligible = false; + if (t instanceof CountedCompleter) { + CountedCompleter<?> f = (CountedCompleter<?>)t; + for (int steps = cap; steps > 0; --steps) { + if (f == task) { + eligible = true; + break; + } + if ((f = f.completer) == null) + break; + } } - } - else if (t instanceof CountedCompleter) { - CountedCompleter<?> f; - for (f = (CountedCompleter<?>)t;;) { - if (f == task) + if ((s = task.status) < 0) // validate + break outer; + if (q.base == b) { + int nb = b + 1, nk = nb & (cap - 1); + if (eligible) { + if (U.compareAndSetReference( + a, slotOffset(k), t, null)) { + q.updateBase(nb); + t.doExec(); + locals = rescan = true; + break scan; + } + } + else if (a[k] == t) { + if (!rescan && t == null && + (a[nk] != null || q.top - b > 0)) + rescan = true; // revisit break; - else if ((f = f.completer) == null) - break poll; // ineligible - } - if (WorkQueue.casSlotToNull(a, k, t)) { - q.base = nb; - w.source = src; - t.doExec(); - w.source = wsrc; - rescan = true; - break scan; + } } } - else - break; } } } } + return s; } /** - * Runs tasks until {@code isQuiescent()}. Rather than blocking - * when tasks cannot be found, rescans until all others cannot - * find tasks either. + * Runs tasks until all workers are inactive and no tasks are + * found. Rather than blocking when tasks cannot be found, rescans + * until all others cannot find tasks either. * * @param nanos max wait time (Long.MAX_VALUE if effectively untimed) * @param interruptible true if return on interrupt * @return positive if quiescent, negative if interrupted, else 0 */ private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) { - long startTime = System.nanoTime(), parkTime = 0L; - int phase; // w.phase set negative when temporarily quiescent - if (w == null || (phase = w.phase) < 0) + int phase; // w.phase inactive bit set when temporarily quiescent + if (w == null || ((phase = w.phase) & IDLE) != 0) return 0; - int activePhase = phase, inactivePhase = phase | INACTIVE; - int wsrc = w.source, r = 0; - for (boolean locals = true;;) { - WorkQueue[] qs; WorkQueue q; - if (runState < 0) { // terminating - w.phase = activePhase; - return 1; + int wsrc = w.source; + long startTime = System.nanoTime(); + long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos + long prevSum = 0L; + int activePhase = phase, inactivePhase = phase + IDLE; + int r = phase + 1, waits = 0, returnStatus = 1; + boolean locals = true; + for (int e = runState;;) { + if ((e & STOP) != 0) + break; // terminating + if (interruptible && Thread.interrupted()) { + returnStatus = -1; + break; } if (locals) { // run local tasks before (re)polling + locals = false; for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;) u.doExec(); } - boolean rescan = false, busy = locals = false, interrupted; - int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1; - scan: for (int i = n, j; i > 0; --i, ++r) { - if ((q = qs[j = m & r]) != null && q != w) { - for (int src = j | SRC;;) { - ForkJoinTask<?>[] a = q.array; - int b = q.base, cap; - if (a == null || (cap = a.length) <= 0) - break; - int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb; - ForkJoinTask<?> t = a[k]; - U.loadFence(); // for re-reads - if (q.base != b || q.array != a || a[k] != t) - ; - else if (t == null) { - if (!rescan) { - if (a[nk] != null || q.top - b > 0) - rescan = true; - else if (!busy && - q.owner != null && q.phase >= 0) - busy = true; - } + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + long phaseSum = 0L; + boolean rescan = false, busy = false; + scan: for (int l = n; l > 0; --l, ++r) { + int j; WorkQueue q; + if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) { + for (;;) { + ForkJoinTask<?>[] a; int b, cap, k; + if ((a = q.array) == null || (cap = a.length) <= 0) break; - } - else if (phase < 0) // reactivate before taking + ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)]; + if (t != null && phase == inactivePhase) // reactivate w.phase = phase = activePhase; - else if (WorkQueue.casSlotToNull(a, k, t)) { - q.base = nb; - w.source = src; - t.doExec(); - w.source = wsrc; - rescan = locals = true; - break scan; + U.loadFence(); + if (q.base == b && a[k] == t) { + int nb = b + 1; + if (t == null) { + if (!rescan) { + int qp = q.phase, mq = qp & (IDLE | 1); + phaseSum += qp; + if (mq == 0 || q.top - b > 0) + rescan = true; + else if (mq == 1) + busy = true; + } + break; + } + if (U.compareAndSetReference( + a, slotOffset(k), t, null)) { + q.updateBase(nb); + w.source = j; + t.doExec(); + w.source = wsrc; + rescan = locals = true; + break scan; + } } } } } - if (rescan) - ; // retry - else if (phase >= 0) { - parkTime = 0L; + if (e != (e = runState) || prevSum != (prevSum = phaseSum) || + rescan || (e & RS_LOCK) != 0) + ; // inconsistent + else if (!busy) + break; + else if (phase == activePhase) { + waits = 0; // recheck, then sleep w.phase = phase = inactivePhase; } - else if (!busy) { - w.phase = activePhase; - return 1; - } - else if (parkTime == 0L) { - parkTime = 1L << 10; // initially about 1 usec - Thread.yield(); - } - else if ((interrupted = interruptible && Thread.interrupted()) || - System.nanoTime() - startTime > nanos) { - w.phase = activePhase; - return interrupted ? -1 : 0; + else if (System.nanoTime() - startTime > nanos) { + returnStatus = 0; // timed out + break; } + else if (waits == 0) // same as spinLockRunState except + waits = MIN_SLEEP; // with rescan instead of onSpinWait else { - LockSupport.parkNanos(this, parkTime); - if (parkTime < nanos >>> 8 && parkTime < 1L << 20) - parkTime <<= 1; // max sleep approx 1 sec or 1% nanos + LockSupport.parkNanos(this, (long)waits); + if (waits < maxSleep) + waits <<= 1; } } + w.phase = activePhase; + return returnStatus; } /** @@ -2254,28 +2510,31 @@ public class ForkJoinPool extends AbstractExecutorService { * @return positive if quiescent, negative if interrupted, else 0 */ private int externalHelpQuiesce(long nanos, boolean interruptible) { - for (long startTime = System.nanoTime(), parkTime = 0L;;) { - ForkJoinTask<?> t; - if ((t = pollScan(false)) != null) { - t.doExec(); - parkTime = 0L; - } - else if (canStop()) - return 1; - else if (parkTime == 0L) { - parkTime = 1L << 10; - Thread.yield(); - } - else if ((System.nanoTime() - startTime) > nanos) - return 0; - else if (interruptible && Thread.interrupted()) - return -1; - else { - LockSupport.parkNanos(this, parkTime); - if (parkTime < nanos >>> 8 && parkTime < 1L << 20) - parkTime <<= 1; + if (!quiescent()) { + long startTime = System.nanoTime(); + long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); + for (int waits = 0;;) { + ForkJoinTask<?> t; + if (interruptible && Thread.interrupted()) + return -1; + else if ((t = pollScan(false)) != null) { + waits = 0; + t.doExec(); + } + else if (quiescent()) + break; + else if (System.nanoTime() - startTime > nanos) + return 0; + else if (waits == 0) + waits = MIN_SLEEP; + else { + LockSupport.parkNanos(this, (long)waits); + if (waits < maxSleep) + waits <<= 1; + } } } + return 1; } /** @@ -2316,59 +2575,69 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Finds and locks a WorkQueue for an external submitter, or * throws RejectedExecutionException if shutdown or terminating. + * @param r current ThreadLocalRandom.getProbe() value * @param isSubmit false if this is for a common pool fork */ - final WorkQueue submissionQueue(boolean isSubmit) { - int r; - ReentrantLock lock = registrationLock; - if ((r = ThreadLocalRandom.getProbe()) == 0) { + private WorkQueue submissionQueue(int r) { + if (r == 0) { ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - if (lock != null) { // else init error - for (int id = r << 1;;) { // even indices only - int n, i; WorkQueue[] qs; WorkQueue q; - if ((qs = queues) == null || (n = qs.length) <= 0) - break; - else if ((q = qs[i = (n - 1) & id]) == null) { - WorkQueue w = new WorkQueue(null, id | SRC); - w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; - lock.lock(); // install under lock - if (queues == qs && qs[i] == null) - qs[i] = w; // else lost race; discard - lock.unlock(); - } - else if (q.getAndSetAccess(1) != 0) // move and restart - id = (r = ThreadLocalRandom.advanceProbe(r)) << 1; - else if (isSubmit && runState != 0) { - q.access = 0; // check while lock held - break; - } - else + for (;;) { + int n, i, id; WorkQueue[] qs; WorkQueue q; + if ((qs = queues) == null) + break; + if ((n = qs.length) <= 0) + break; + if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { + WorkQueue w = new WorkQueue(null, id, 0, false); + w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; + int stop = lockRunState() & STOP; + if (stop == 0 && queues == qs && qs[i] == null) + q = qs[i] = w; // else discard; retry + unlockRunState(); + if (q != null) return q; + if (stop != 0) + break; + } + else if (!q.tryLockPhase()) // move index + r = ThreadLocalRandom.advanceProbe(r); + else if ((runState & SHUTDOWN) != 0) { + q.unlockPhase(); // check while q lock held + break; } + else + return q; } + tryTerminate(false, false); throw new RejectedExecutionException(); } - /** - * Pushes a submission to the pool, using internal queue if called - * from ForkJoinWorkerThread, else external queue. - */ - private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, - ForkJoinTask<T> task) { - WorkQueue q; Thread t; ForkJoinWorkerThread wt; - U.storeStoreFence(); // ensure safely publishable - if (task == null) throw new NullPointerException(); + private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) { + Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && - (wt = (ForkJoinWorkerThread)t).pool == this) + (wt = (ForkJoinWorkerThread)t).pool == this) { + internal = true; q = wt.workQueue; - else { - task.markPoolSubmission(); - q = submissionQueue(true); } - q.push(task, this, signalIfEmpty); - return task; + else { // find and lock queue + internal = false; + q = submissionQueue(ThreadLocalRandom.getProbe()); + } + q.push(task, signalIfEmpty ? this : null, internal); + } + + /** + * Returns queue for an external submission, bypassing call to + * submissionQueue if already established and unlocked. + */ + final WorkQueue externalSubmissionQueue() { + WorkQueue[] qs; WorkQueue q; int n; + int r = ThreadLocalRandom.getProbe(); + return (((qs = queues) != null && (n = qs.length) > 0 && + (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && + q.tryLockPhase()) ? q : submissionQueue(r)); } /** @@ -2376,12 +2645,12 @@ public class ForkJoinPool extends AbstractExecutorService { * possibly ever submitted to the given pool (nonzero probe), or * null if none. */ - private static WorkQueue externalQueue(ForkJoinPool p) { - WorkQueue[] qs; - int r = ThreadLocalRandom.getProbe(), n; + static WorkQueue externalQueue(ForkJoinPool p) { + WorkQueue[] qs; int n; + int r = ThreadLocalRandom.getProbe(); return (p != null && (qs = p.queues) != null && (n = qs.length) > 0 && r != 0) ? - qs[(n - 1) & (r << 1)] : null; + qs[r & EXTERNAL_ID_MASK & (n - 1)] : null; } /** @@ -2392,25 +2661,17 @@ public class ForkJoinPool extends AbstractExecutorService { } /** - * Returns queue for an external thread, if one exists - */ - final WorkQueue externalQueue() { - return externalQueue(this); - } - - /** * If the given executor is a ForkJoinPool, poll and execute * AsynchronousCompletionTasks from worker's queue until none are * available or blocker is released. */ static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) { WorkQueue w = null; Thread t; ForkJoinWorkerThread wt; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - if ((wt = (ForkJoinWorkerThread)t).pool == e) - w = wt.workQueue; - } + if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && + (wt = (ForkJoinWorkerThread)t).pool == e) + w = wt.workQueue; else if (e instanceof ForkJoinPool) - w = ((ForkJoinPool)e).externalQueue(); + w = externalQueue((ForkJoinPool)e); if (w != null) w.helpAsyncBlocker(blocker); } @@ -2482,59 +2743,83 @@ public class ForkJoinPool extends AbstractExecutorService { * @param now if true, unconditionally terminate, else only * if no work and no active workers * @param enable if true, terminate when next possible - * @return true if terminating or terminated - */ - private boolean tryTerminate(boolean now, boolean enable) { - int rs; ReentrantLock lock; Condition cond; - if ((rs = runState) >= 0) { // set SHUTDOWN and/or STOP - if ((config & ISCOMMON) != 0) - return false; // cannot shutdown - if (!now) { - if ((rs & SHUTDOWN) == 0) { - if (!enable) - return false; - getAndBitwiseOrRunState(SHUTDOWN); - } - if (!canStop()) - return false; + * @return runState on exit + */ + private int tryTerminate(boolean now, boolean enable) { + int e = runState; + if ((e & STOP) == 0) { + if (now) { + int s = lockRunState(); + runState = e = (s + RS_LOCK) | STOP | SHUTDOWN; + if ((s & STOP) == 0) + interruptAll(); + } + else { + int isShutdown = (e & SHUTDOWN); + if (isShutdown == 0 && enable) + getAndBitwiseOrRunState(isShutdown = SHUTDOWN); + if (isShutdown != 0) + quiescent(); // may trigger STOP + e = runState; } - getAndBitwiseOrRunState(SHUTDOWN | STOP); - } - WorkQueue released = reactivate(); // try signalling waiter - int tc = (short)(ctl >>> TC_SHIFT); - if (released == null && tc > 0) { // help unblock and cancel - Thread current = Thread.currentThread(); - WorkQueue w = ((current instanceof ForkJoinWorkerThread) ? - ((ForkJoinWorkerThread)current).workQueue : null); - int r = (w == null) ? 0 : w.config + 1; // stagger traversals + } + if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks + int r = (int)Thread.currentThread().threadId(); // stagger traversals WorkQueue[] qs = queues; int n = (qs == null) ? 0 : qs.length; - for (int i = 0; i < n; ++i) { - WorkQueue q; Thread thread; - if ((q = qs[(r + i) & (n - 1)]) != null && - (thread = q.owner) != current && q.access != STOP) { - for (ForkJoinTask<?> t; (t = q.poll(null)) != null; ) - ForkJoinTask.cancelIgnoringExceptions(t); - if (thread != null && !thread.isInterrupted()) { - q.forcePhaseActive(); // for awaitWork - try { - thread.interrupt(); - } catch (Throwable ignore) { - } + for (int l = n; l > 0; --l, ++r) { + int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t; + while ((q = qs[j]) != null && q.source != DEREGISTERED && + (t = q.poll(null)) != null) { + try { + t.cancel(false); + } catch (Throwable ignore) { } } } + if (((e = runState) & TERMINATED) == 0 && ctl == 0L) { + if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) { + CountDownLatch done; SharedThreadContainer ctr; + if ((done = termination) != null) + done.countDown(); + if ((ctr = container) != null) + ctr.close(); + } + e = runState; + } } - if ((tc <= 0 || (short)(ctl >>> TC_SHIFT) <= 0) && - (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 && - (lock = registrationLock) != null) { - lock.lock(); // signal when no workers - if ((cond = termination) != null) - cond.signalAll(); - lock.unlock(); - container.close(); + return e; + } + + /** + * Interrupts all workers + */ + private void interruptAll() { + Thread current = Thread.currentThread(); + WorkQueue[] qs = queues; + int n = (qs == null) ? 0 : qs.length; + for (int i = 1; i < n; i += 2) { + WorkQueue q; Thread o; + if ((q = qs[i]) != null && (o = q.owner) != null && o != current && + q.source != DEREGISTERED) { + try { + o.interrupt(); + } catch (Throwable ignore) { + } + } } - return true; + } + + + /** + * Returns termination signal, constructing if necessary + */ + private CountDownLatch terminationSignal() { + CountDownLatch signal, s, u; + if ((signal = termination) == null) + signal = ((u = cmpExTerminationSignal( + s = new CountDownLatch(1))) == null) ? s : u; + return signal; } // Exported methods @@ -2708,19 +2993,17 @@ public class ForkJoinPool extends AbstractExecutorService { throw new IllegalArgumentException(); if (factory == null || unit == null) throw new NullPointerException(); + int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); this.parallelism = p; this.factory = factory; this.ueh = handler; this.saturate = saturate; - this.config = asyncMode ? FIFO : 0; this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP); - int corep = Math.clamp(corePoolSize, p, MAX_CAP); int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP); int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP); - this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) | - ((long)corep << 32); - int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); - this.registrationLock = new ReentrantLock(); + this.config = (((asyncMode ? FIFO : 0) & LMASK) | + (((long)maxSpares) << TC_SHIFT) | + (((long)minAvail) << RC_SHIFT)); this.queues = new WorkQueue[size]; String pid = Integer.toString(getAndAddPoolIds(1) + 1); String name = "ForkJoinPool-" + pid; @@ -2768,14 +3051,13 @@ public class ForkJoinPool extends AbstractExecutorService { int p = Math.min(pc, MAX_CAP); int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1)); this.parallelism = p; - this.config = ISCOMMON | preset; - this.bounds = (long)(1 | (maxSpares << SWIDTH)); + this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) | + (1L << RC_SHIFT)); this.factory = fac; this.ueh = handler; this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; - this.registrationLock = new ReentrantLock(); this.queues = new WorkQueue[size]; this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); } @@ -2818,6 +3100,7 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public <T> T invoke(ForkJoinTask<T> task) { + Objects.requireNonNull(task); poolSubmit(true, task); return task.join(); } @@ -2831,6 +3114,7 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public void execute(ForkJoinTask<?> task) { + Objects.requireNonNull(task); poolSubmit(true, task); } @@ -2864,7 +3148,9 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { - return poolSubmit(true, task); + Objects.requireNonNull(task); + poolSubmit(true, task); + return task; } /** @@ -2874,7 +3160,12 @@ public class ForkJoinPool extends AbstractExecutorService { */ @Override public <T> ForkJoinTask<T> submit(Callable<T> task) { - return poolSubmit(true, new ForkJoinTask.AdaptedCallable<T>(task)); + ForkJoinTask<T> t = + (Thread.currentThread() instanceof ForkJoinWorkerThread) ? + new ForkJoinTask.AdaptedCallable<T>(task) : + new ForkJoinTask.AdaptedInterruptibleCallable<T>(task); + poolSubmit(true, t); + return t; } /** @@ -2884,7 +3175,12 @@ public class ForkJoinPool extends AbstractExecutorService { */ @Override public <T> ForkJoinTask<T> submit(Runnable task, T result) { - return poolSubmit(true, new ForkJoinTask.AdaptedRunnable<T>(task, result)); + ForkJoinTask<T> t = + (Thread.currentThread() instanceof ForkJoinWorkerThread) ? + new ForkJoinTask.AdaptedRunnable<T>(task, result) : + new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result); + poolSubmit(true, t); + return t; } /** @@ -2895,13 +3191,15 @@ public class ForkJoinPool extends AbstractExecutorService { @Override @SuppressWarnings("unchecked") public ForkJoinTask<?> submit(Runnable task) { - return poolSubmit(true, (task instanceof ForkJoinTask<?>) - ? (ForkJoinTask<Void>) task // avoid re-wrap - : new ForkJoinTask.AdaptedRunnableAction(task)); + ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ? + (ForkJoinTask<Void>) task : // avoid re-wrap + ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? + new ForkJoinTask.AdaptedRunnable<Void>(task, null) : + new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)); + poolSubmit(true, f); + return f; } - // Added mainly for possible use in Loom - /** * Submits the given task as if submitted from a non-{@code ForkJoinTask} * client. The task is added to a scheduling queue for submissions to the @@ -2920,10 +3218,8 @@ public class ForkJoinPool extends AbstractExecutorService { * @since 20 */ public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { - U.storeStoreFence(); // ensure safely publishable - task.markPoolSubmission(); - WorkQueue q = submissionQueue(true); - q.push(task, this, true); + Objects.requireNonNull(task); + externalSubmissionQueue().push(task, this, false); return task; } @@ -2944,7 +3240,9 @@ public class ForkJoinPool extends AbstractExecutorService { * @since 19 */ public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) { - return poolSubmit(false, task); + Objects.requireNonNull(task); + poolSubmit(false, task); + return task; } /** @@ -2980,16 +3278,34 @@ public class ForkJoinPool extends AbstractExecutorService { } /** - * @throws NullPointerException {@inheritDoc} - * @throws RejectedExecutionException {@inheritDoc} + * Uninterrupible version of {@code invokeAll}. Executes the given + * tasks, returning a list of Futures holding their status and + * results when all complete, ignoring interrupts. {@link + * Future#isDone} is {@code true} for each element of the returned + * list. Note that a <em>completed</em> task could have + * terminated either normally or by throwing an exception. The + * results of this method are undefined if the given collection is + * modified while this operation is in progress. + * + * @apiNote This method supports usages that previously relied on an + * incompatible override of + * {@link ExecutorService#invokeAll(java.util.Collection)}. + * + * @param tasks the collection of tasks + * @param <T> the type of the values returned from the tasks + * @return a list of Futures representing the tasks, in the same + * sequential order as produced by the iterator for the + * given task list, each of which has completed + * @throws NullPointerException if tasks or any of its elements are {@code null} + * @throws RejectedExecutionException if any task cannot be + * scheduled for execution + * @since 22 */ - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) { ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); try { for (Callable<T> t : tasks) { - ForkJoinTask<T> f = - new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); + ForkJoinTask<T> f = ForkJoinTask.adapt(t); futures.add(f); poolSubmit(true, f); } @@ -2998,139 +3314,66 @@ public class ForkJoinPool extends AbstractExecutorService { return futures; } catch (Throwable t) { for (Future<T> e : futures) - ForkJoinTask.cancelIgnoringExceptions(e); + e.cancel(true); throw t; } } - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, - long timeout, TimeUnit unit) + /** + * Common support for timed and untimed invokeAll + */ + private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long deadline) throws InterruptedException { - long nanos = unit.toNanos(timeout); ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); try { for (Callable<T> t : tasks) { - ForkJoinTask<T> f = - new ForkJoinTask.AdaptedInterruptibleCallable<T>(t); + ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t); futures.add(f); poolSubmit(true, f); } - long startTime = System.nanoTime(), ns = nanos; - boolean timedOut = (ns < 0L); - for (int i = futures.size() - 1; i >= 0; --i) { - ForkJoinTask<T> f = (ForkJoinTask<T>)futures.get(i); - if (!f.isDone()) { - if (!timedOut) - timedOut = !f.quietlyJoin(ns, TimeUnit.NANOSECONDS); - if (timedOut) - ForkJoinTask.cancelIgnoringExceptions(f); - else - ns = nanos - (System.nanoTime() - startTime); - } - } + for (int i = futures.size() - 1; i >= 0; --i) + ((ForkJoinTask<?>)futures.get(i)) + .quietlyJoinPoolInvokeAllTask(deadline); return futures; } catch (Throwable t) { for (Future<T> e : futures) - ForkJoinTask.cancelIgnoringExceptions(e); + e.cancel(true); throw t; } } - // Task to hold results from InvokeAnyTasks - static final class InvokeAnyRoot<E> extends ForkJoinTask<E> { - private static final long serialVersionUID = 2838392045355241008L; - @SuppressWarnings("serial") // Conditionally serializable - volatile E result; - final AtomicInteger count; // in case all throw - @SuppressWarnings("serial") - final ForkJoinPool pool; // to check shutdown while collecting - InvokeAnyRoot(int n, ForkJoinPool p) { - pool = p; - count = new AtomicInteger(n); - } - final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks - Throwable ex = null; - boolean failed; - if (c == null || Thread.interrupted() || - (pool != null && pool.runState < 0)) - failed = true; - else if (isDone()) - failed = false; - else { - try { - complete(c.call()); - failed = false; - } catch (Throwable tx) { - ex = tx; - failed = true; - } - } - if ((pool != null && pool.runState < 0) || - (failed && count.getAndDecrement() <= 1)) - trySetThrown(ex != null ? ex : new CancellationException()); - } - public final boolean exec() { return false; } // never forked - public final E getRawResult() { return result; } - public final void setRawResult(E v) { result = v; } - } - - // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot - static final class InvokeAnyTask<E> extends ForkJoinTask<E> { - private static final long serialVersionUID = 2838392045355241008L; - final InvokeAnyRoot<E> root; - @SuppressWarnings("serial") // Conditionally serializable - final Callable<E> callable; - transient volatile Thread runner; - InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) { - this.root = root; - this.callable = callable; - } - public final boolean exec() { - Thread.interrupted(); - runner = Thread.currentThread(); - root.tryComplete(callable); - runner = null; - Thread.interrupted(); - return true; - } - public final boolean cancel(boolean mayInterruptIfRunning) { - Thread t; - boolean stat = super.cancel(false); - if (mayInterruptIfRunning && (t = runner) != null) { - try { - t.interrupt(); - } catch (Throwable ignore) { - } - } - return stat; - } - public final void setRawResult(E v) {} // unused - public final E getRawResult() { return null; } + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return invokeAll(tasks, 0L); + } + // for jdk version < 22, replace with + // /** + // * @throws NullPointerException {@inheritDoc} + // * @throws RejectedExecutionException {@inheritDoc} + // */ + // @Override + // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + // return invokeAllUninterruptibly(tasks); + // } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) + throws InterruptedException { + return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L); } @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { - int n = tasks.size(); - if (n <= 0) - throw new IllegalArgumentException(); - InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this); - ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n); try { - for (Callable<T> c : tasks) { - if (c == null) - throw new NullPointerException(); - InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c); - fs.add(f); - poolSubmit(true, f); - if (root.isDone()) - break; - } - return root.get(); - } finally { - for (InvokeAnyTask<T> f : fs) - ForkJoinTask.cancelIgnoringExceptions(f); + return new ForkJoinTask.InvokeAnyRoot<T>() + .invokeAny(tasks, this, false, 0L); + } catch (TimeoutException cannotHappen) { + assert false; + return null; } } @@ -3138,27 +3381,8 @@ public class ForkJoinPool extends AbstractExecutorService { public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - long nanos = unit.toNanos(timeout); - int n = tasks.size(); - if (n <= 0) - throw new IllegalArgumentException(); - InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this); - ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n); - try { - for (Callable<T> c : tasks) { - if (c == null) - throw new NullPointerException(); - InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c); - fs.add(f); - poolSubmit(true, f); - if (root.isDone()) - break; - } - return root.get(nanos, TimeUnit.NANOSECONDS); - } finally { - for (InvokeAnyTask<T> f : fs) - ForkJoinTask.cancelIgnoringExceptions(f); - } + return new ForkJoinTask.InvokeAnyRoot<T>() + .invokeAny(tasks, this, true, unit.toNanos(timeout)); } /** @@ -3264,7 +3488,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @return {@code true} if all threads are currently idle */ public boolean isQuiescent() { - return canStop(); + return quiescent(); } /** @@ -3339,7 +3563,14 @@ public class ForkJoinPool extends AbstractExecutorService { * @return {@code true} if there are any queued submissions */ public boolean hasQueuedSubmissions() { - return hasTasks(true); + WorkQueue[] qs; WorkQueue q; + if ((runState & STOP) == 0 && (qs = queues) != null) { + for (int i = 0; i < qs.length; i += 2) { + if ((q = qs[i]) != null && q.queueSize() > 0) + return true; + } + } + return false; } /** @@ -3388,6 +3619,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ public String toString() { // Use a single pass through queues to collect counts + int e = runState; long st = stealCount; long qt = 0L, ss = 0L; int rc = 0; WorkQueue[] qs; WorkQueue q; @@ -3413,10 +3645,9 @@ public class ForkJoinPool extends AbstractExecutorService { int ac = (short)(c >>> RC_SHIFT); if (ac < 0) // ignore transient negative ac = 0; - int rs = runState; - String level = ((rs & TERMINATED) != 0 ? "Terminated" : - (rs & STOP) != 0 ? "Terminating" : - (rs & SHUTDOWN) != 0 ? "Shutting down" : + String level = ((e & TERMINATED) != 0 ? "Terminated" : + (e & STOP) != 0 ? "Terminating" : + (e & SHUTDOWN) != 0 ? "Shutting down" : "Running"); return super.toString() + "[" + level + @@ -3446,7 +3677,8 @@ public class ForkJoinPool extends AbstractExecutorService { */ public void shutdown() { checkPermission(); - tryTerminate(false, true); + if (workerNamePrefix != null) // not common pool + tryTerminate(false, true); } /** @@ -3469,7 +3701,8 @@ public class ForkJoinPool extends AbstractExecutorService { */ public List<Runnable> shutdownNow() { checkPermission(); - tryTerminate(true, true); + if (workerNamePrefix != null) // not common pool + tryTerminate(true, true); return Collections.emptyList(); } @@ -3479,7 +3712,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @return {@code true} if all tasks have completed following shut down */ public boolean isTerminated() { - return (runState & TERMINATED) != 0; + return (tryTerminate(false, false) & TERMINATED) != 0; } /** @@ -3496,7 +3729,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @return {@code true} if terminating but not yet terminated */ public boolean isTerminating() { - return (runState & (STOP | TERMINATED)) == STOP; + return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP; } /** @@ -3505,7 +3738,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @return {@code true} if this pool has been shut down */ public boolean isShutdown() { - return runState != 0; + return (runState & SHUTDOWN) != 0; } /** @@ -3524,30 +3757,19 @@ public class ForkJoinPool extends AbstractExecutorService { */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - ReentrantLock lock; Condition cond; boolean terminated; long nanos = unit.toNanos(timeout); - if ((config & ISCOMMON) != 0) { + CountDownLatch done; + if (workerNamePrefix == null) { // is common pool if (helpQuiescePool(this, nanos, true) < 0) throw new InterruptedException(); - terminated = false; - } - else if (!(terminated = ((runState & TERMINATED) != 0))) { - tryTerminate(false, false); // reduce transient blocking - if ((lock = registrationLock) != null && - !(terminated = (((runState & TERMINATED) != 0)))) { - lock.lock(); - try { - if ((cond = termination) == null) - termination = cond = lock.newCondition(); - while (!(terminated = ((runState & TERMINATED) != 0)) && - nanos > 0L) - nanos = cond.awaitNanos(nanos); - } finally { - lock.unlock(); - } - } + return false; } - return terminated; + else if ((tryTerminate(false, false) & TERMINATED) != 0 || + (done = terminationSignal()) == null || + (runState & TERMINATED) != 0) + return true; + else + return done.await(nanos, TimeUnit.NANOSECONDS); } /** @@ -3591,25 +3813,24 @@ public class ForkJoinPool extends AbstractExecutorService { */ @Override public void close() { - if ((config & ISCOMMON) == 0) { - boolean terminated = tryTerminate(false, false); - if (!terminated) { - shutdown(); - boolean interrupted = false; - while (!terminated) { + if (workerNamePrefix != null) { + checkPermission(); + CountDownLatch done = null; + boolean interrupted = false; + while ((tryTerminate(interrupted, true) & TERMINATED) == 0) { + if (done == null) + done = terminationSignal(); + else { try { - terminated = awaitTermination(1L, TimeUnit.DAYS); - } catch (InterruptedException e) { - if (!interrupted) { - shutdownNow(); - interrupted = true; - } + done.await(); + break; + } catch (InterruptedException ex) { + interrupted = true; } } - if (interrupted) { - Thread.currentThread().interrupt(); - } } + if (interrupted) + Thread.currentThread().interrupt(); } } @@ -3728,18 +3949,20 @@ public class ForkJoinPool extends AbstractExecutorService { /** ManagedBlock for ForkJoinWorkerThreads */ private void compensatedBlock(ManagedBlocker blocker) throws InterruptedException { - if (blocker == null) throw new NullPointerException(); + Objects.requireNonNull(blocker); for (;;) { int comp; boolean done; long c = ctl; if (blocker.isReleasable()) break; - if ((comp = tryCompensate(c, false)) >= 0) { - long post = (comp == 0) ? 0L : RC_UNIT; + if ((runState & STOP) != 0) + throw new InterruptedException(); + if ((comp = tryCompensate(c)) >= 0) { try { done = blocker.block(); } finally { - getAndAddCtl(post); + if (comp > 0) + getAndAddCtl(RC_UNIT); } if (done) break; @@ -3753,22 +3976,17 @@ public class ForkJoinPool extends AbstractExecutorService { * blocking operation is done then endCompensatedBlock must be invoked * with the value returned by this method to re-adjust the parallelism. */ - private long beginCompensatedBlock() { - for (;;) { - int comp; - if ((comp = tryCompensate(ctl, false)) >= 0) { - return (comp == 0) ? 0L : RC_UNIT; - } else { - Thread.onSpinWait(); - } - } + final long beginCompensatedBlock() { + int c; + do {} while ((c = tryCompensate(ctl)) < 0); + return (c == 0) ? 0L : RC_UNIT; } /** * Re-adjusts parallelism after a blocking operation completes. */ void endCompensatedBlock(long post) { - if (post > 0) { + if (post > 0L) { getAndAddCtl(post); } } @@ -3776,22 +3994,22 @@ public class ForkJoinPool extends AbstractExecutorService { /** ManagedBlock for external threads */ private static void unmanagedBlock(ManagedBlocker blocker) throws InterruptedException { - if (blocker == null) throw new NullPointerException(); + Objects.requireNonNull(blocker); do {} while (!blocker.isReleasable() && !blocker.block()); } - // AbstractExecutorService.newTaskFor overrides rely on - // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks - // that also implement RunnableFuture. - @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { - return new ForkJoinTask.AdaptedRunnable<T>(runnable, value); + return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? + new ForkJoinTask.AdaptedRunnable<T>(runnable, value) : + new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return new ForkJoinTask.AdaptedCallable<T>(callable); + return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? + new ForkJoinTask.AdaptedCallable<T>(callable) : + new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable); } static { @@ -3806,8 +4024,15 @@ public class ForkJoinPool extends AbstractExecutorService { } CTL = U.objectFieldOffset(klass, "ctl"); RUNSTATE = U.objectFieldOffset(klass, "runState"); - PARALLELISM = U.objectFieldOffset(klass, "parallelism"); + PARALLELISM = U.objectFieldOffset(klass, "parallelism"); THREADIDS = U.objectFieldOffset(klass, "threadIds"); + TERMINATION = U.objectFieldOffset(klass, "termination"); + Class<ForkJoinTask[]> aklass = ForkJoinTask[].class; + ABASE = U.arrayBaseOffset(aklass); + int scale = U.arrayIndexScale(aklass); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); + if ((scale & (scale - 1)) != 0) + throw new Error("array index scale not a power of two"); defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index e0737cde89d..8706359cda2 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -39,6 +39,7 @@ import java.io.Serializable; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.RandomAccess; import java.util.concurrent.locks.LockSupport; import jdk.internal.misc.Unsafe; @@ -208,20 +209,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * See the internal documentation of class ForkJoinPool for a * general implementation overview. ForkJoinTasks are mainly * responsible for maintaining their "status" field amidst relays - * to methods in ForkJoinWorkerThread and ForkJoinPool. + * to methods in ForkJoinWorkerThread and ForkJoinPool, along with + * recording and reporting exceptions. The status field mainly + * holds bits recording completion status. Note that there is no + * status bit representing "running", recording whether incomplete + * tasks are queued vs executing. However these cases can be + * distinguished in subclasses of InterruptibleTask that adds this + * capability by recording the running thread. Cancellation is + * recorded in status bits (ABNORMAL but not THROWN), but reported + * in joining methods by throwing an exception. Other exceptions + * of completed (THROWN) tasks are recorded in the "aux" field, + * but are reconstructed (in getException) to produce more useful + * stack traces when reported. Sentinels for interruptions or + * timeouts while waiting for completion are not recorded as + * status bits but are included in return values of methods in + * which they occur. * * The methods of this class are more-or-less layered into * (1) basic status maintenance * (2) execution and awaiting completion * (3) user-level methods that additionally report results. + * (4) Subclasses for adaptors and internal usages * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. - * - * Revision notes: This class uses jdk-internal Unsafe for atomics - * and special memory modes, rather than VarHandles, to avoid - * initialization dependencies in other jdk components that - * require early parallelism. It also simplifies handling of - * pool-submitted tasks, among other minor improvements. */ /** @@ -231,9 +241,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * waiters. Cancelled waiters try to unsplice. */ static final class Aux { - final Thread thread; - final Throwable ex; // null if a waiter - Aux next; // accessed only via memory-acquire chains + Thread thread; // thrower or waiter + final Throwable ex; + Aux next; // accessed only via memory-acquire chains Aux(Thread thread, Throwable ex) { this.thread = thread; this.ex = ex; @@ -259,17 +269,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * control bits occupy only (some of) the upper half (16 bits) of * status field. The lower bits are used for user-defined tags. */ - static final int DONE = 1 << 31; // must be negative - static final int ABNORMAL = 1 << 16; - static final int THROWN = 1 << 17; - static final int SMASK = 0xffff; // short bits for tags - static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel - static final int POOLSUBMIT = 1 << 18; // for pool.submit vs fork - - // flags for awaitDone (in addition to above) - static final int RAN = 1; - static final int INTERRUPTIBLE = 2; - static final int TIMED = 4; + static final int DONE = 1 << 31; // must be negative + static final int ABNORMAL = 1 << 16; + static final int THROWN = 1 << 17; + static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN; + static final int MARKER = 1 << 30; // utility marker + static final int SMASK = 0xffff; // short bits for tags + static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel // Fields volatile int status; // accessed directly by pool and workers @@ -285,25 +291,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { private boolean casStatus(int c, int v) { return U.compareAndSetInt(this, STATUS, c, v); } + + // Support for waiting and signalling + private boolean casAux(Aux c, Aux v) { return U.compareAndSetReference(this, AUX, c, v); } - - /** - * Marks this task as an external pool submission. - */ - final void markPoolSubmission() { - getAndBitwiseOrStatus(POOLSUBMIT); + private Aux compareAndExchangeAux(Aux c, Aux v) { + return (Aux)U.compareAndExchangeReference(this, AUX, c, v); } - /** Removes and unparks waiters */ private void signalWaiters() { - for (Aux a; (a = aux) != null && a.ex == null; ) { - if (casAux(a, null)) { // detach entire list - for (Thread t; a != null; a = a.next) { - if ((t = a.thread) != Thread.currentThread() && t != null) - LockSupport.unpark(t); // don't self-signal - } + for (Aux a = aux;;) { + if (a == null || a.ex != null) + break; + if (a == (a = compareAndExchangeAux(a, null))) { + do { // detach entire list + LockSupport.unpark(a.thread); + } while ((a = a.next) != null); break; } } @@ -311,23 +316,27 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { /** * Sets DONE status and wakes up threads waiting to join this task. - * @return status on exit */ - private int setDone() { - int s = getAndBitwiseOrStatus(DONE) | DONE; + private void setDone() { + getAndBitwiseOrStatus(DONE); signalWaiters(); - return s; } /** * Sets ABNORMAL DONE status unless already done, and wakes up threads * waiting to join this task. - * @return status on exit + * @return previous status */ - private int trySetCancelled() { + final int trySetCancelled() { int s; - do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL))); - signalWaiters(); + for (;;) { + if ((s = status) < 0) + break; + if (casStatus(s, s | (DONE | ABNORMAL))) { + signalWaiters(); + break; + } + } return s; } @@ -337,175 +346,175 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * If losing a race with setDone or trySetCancelled, the exception * may be recorded but not reported. * - * @return status on exit + * @return true if set */ - final int trySetThrown(Throwable ex) { - Aux h = new Aux(Thread.currentThread(), ex), p = null; - boolean installed = false; + final boolean trySetThrown(Throwable ex) { int s; - while ((s = status) >= 0) { - Aux a; - if (!installed && ((a = aux) == null || a.ex == null) && - (installed = casAux(a, h))) - p = a; // list of waiters replaced by h - if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN))) - break; + boolean set = false, installed = false; + if ((s = status) >= 0) { + Aux a, p = null, h = new Aux(Thread.currentThread(), ex); + do { + if (!installed && ((a = aux) == null || a.ex == null) && + (installed = casAux(a, h))) + p = a; // list of waiters replaced by h + if (installed && (set = casStatus(s, s | HAVE_EXCEPTION))) + break; + } while ((s = status) >= 0); + for (; p != null; p = p.next) + LockSupport.unpark(p.thread); } - for (; p != null; p = p.next) - LockSupport.unpark(p.thread); - return s; + return set; } /** - * Records exception unless already done. Overridable in subclasses. - * - * @return status on exit + * Overridable action on setting exception */ - int trySetException(Throwable ex) { - return trySetThrown(ex); + void onAuxExceptionSet(Throwable ex) { } /** - * Constructor for subclasses to call. + * Tries to set exception, if so invoking onAuxExceptionSet */ - public ForkJoinTask() {} - - static boolean isExceptionalStatus(int s) { // needed by subclasses - return (s & THROWN) != 0; + final void trySetException(Throwable ex) { + if (trySetThrown(ex)) + onAuxExceptionSet(ex); } - /** - * Unless done, calls exec and records status if completed, but - * doesn't wait for completion otherwise. + /* + * Waits for signal, interrupt, timeout, or pool termination. * - * @return status on exit from this method - */ - final int doExec() { - int s; boolean completed; + * @param pool if nonnull, the pool of ForkJoinWorkerThread caller + * @param compensation result from a helping method + * @param interruptible if wait is interruptible + * @param deadline if nonzero, timeout deadline + * @return ABNORMAL if interrupted, 0 on timeout, else status on exit + */ + private int awaitDone(ForkJoinPool pool, int compensation, + boolean interruptible, long deadline) { + int s; if ((s = status) >= 0) { - try { - completed = exec(); - } catch (Throwable rex) { - s = trySetException(rex); - completed = false; - } - if (completed) - s = setDone(); - } - return s; - } - - /** - * Helps and/or waits for completion from join, get, or invoke; - * called from either internal or external threads. - * - * @param how flags for POOLSUBMIT, RAN, INTERRUPTIBLE, TIMED - * @param deadline if timed, timeout deadline - * @return ABNORMAL if interrupted, else status on exit - */ - private int awaitDone(int how, long deadline) { - int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool p; - ForkJoinPool.WorkQueue q = null; - boolean timed = (how & TIMED) != 0; - boolean owned = false, uncompensate = false; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - owned = true; - q = (wt = (ForkJoinWorkerThread)t).workQueue; - p = wt.pool; - } - else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0) - q = p.externalQueue(); - if (q != null && p != null) { // try helping - if (this instanceof CountedCompleter) - s = p.helpComplete(this, q, owned, timed); - else if ((how & RAN) != 0 || - (s = q.tryRemoveAndExec(this, owned)) >= 0) - s = (owned) ? p.helpJoin(this, q, timed) : 0; - if (s < 0) - return s; - if (s == UNCOMPENSATE) - uncompensate = true; - } - Aux node = null; - long ns = 0L; - boolean interrupted = false, queued = false; - for (;;) { // install node and await signal - Aux a; - if ((s = status) < 0) - break; - else if (node == null) + Aux node = null; + try { // spinwait if out of memory node = new Aux(Thread.currentThread(), null); - else if (!queued) { - if (((a = aux) == null || a.ex == null) && - (queued = casAux(node.next = a, node))) - LockSupport.setCurrentBlocker(this); + } catch (OutOfMemoryError ex) { } - else if (timed && (ns = deadline - System.nanoTime()) <= 0) { - s = 0; - break; - } - else if (Thread.interrupted()) { - interrupted = true; - if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0) - cancelIgnoringExceptions(this); // cancel on shutdown - else if ((how & INTERRUPTIBLE) != 0) { - s = ABNORMAL; + boolean queued = false; + for (Aux a;;) { // try to install node + if ((s = status) < 0) + break; + else if (node == null) + Thread.onSpinWait(); + else if (((a = aux) == null || a.ex == null) && + (queued = casAux(node.next = a, node))) break; - } } - else if ((s = status) < 0) // recheck - break; - else if (timed) - LockSupport.parkNanos(ns); - else - LockSupport.park(); - } - if (uncompensate) - p.uncompensate(); - - if (queued) { - LockSupport.setCurrentBlocker(null); - if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer - outer: for (Aux a; (a = aux) != null && a.ex == null; ) { - for (Aux trail = null;;) { + if (queued) { // await signal or interrupt + LockSupport.setCurrentBlocker(this); + int interrupts = 0; // < 0 : throw; > 0 : re-interrupt + for (;;) { + if ((s = status) < 0) + break; + else if (interrupts < 0) { + s = ABNORMAL; // interrupted and not done + break; + } + else if (Thread.interrupted()) { + if (!ForkJoinPool.poolIsStopping(pool)) + interrupts = interruptible ? -1 : 1; + else { + interrupts = 1; // re-assert if cleared + try { + cancel(true); + } catch (Throwable ignore) { + } + } + } + else if (deadline != 0L) { + long ns; + if ((ns = deadline - System.nanoTime()) <= 0) { + s = 0; + break; + } + LockSupport.parkNanos(ns); + } + else + LockSupport.park(); + } + node.thread = null; // help clean aux; raciness OK + clean: for (Aux a;;) { // remove node if still present + if ((a = aux) == null || a.ex != null) + break; + for (Aux prev = null;;) { Aux next = a.next; if (a == node) { - if (trail != null) - trail.casNext(trail, next); + if (prev != null) + prev.casNext(prev, next); else if (casAux(a, next)) - break outer; // cannot be re-encountered - break; // restart - } else { - trail = a; - if ((a = next) == null) - break outer; + break clean; + break; // check for failed or stale CAS } + prev = a; + if ((a = next) == null) + break clean; // not found } } - } - else { - signalWaiters(); // help clean or signal - if (interrupted) + LockSupport.setCurrentBlocker(null); + if (interrupts > 0) Thread.currentThread().interrupt(); } } + if (compensation == UNCOMPENSATE && pool != null) + pool.uncompensate(); return s; } /** - * Cancels, ignoring any exceptions thrown by cancel. Cancel is - * spec'ed not to throw any exceptions, but if it does anyway, we - * have no recourse, so guard against this case. + * Tries applicable helping steps while joining this task, + * otherwise invokes blocking version of awaitDone. Called only + * when pre-checked not to be done, and pre-screened for + * interrupts and timeouts, if applicable. + * + * @param interruptible if wait is interruptible + * @param deadline if nonzero, timeout deadline + * @return ABNORMAL if interrupted, else status on exit */ - static final void cancelIgnoringExceptions(Future<?> t) { - if (t != null) { + private int awaitDone(boolean interruptible, long deadline) { + ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q; + Thread t; boolean internal; int s; + if (internal = + (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + p = (wt = (ForkJoinWorkerThread)t).pool; + q = wt.workQueue; + } + else + q = ForkJoinPool.externalQueue(p = ForkJoinPool.common); + return (((s = (p == null) ? 0 : + ((this instanceof CountedCompleter) ? + p.helpComplete(this, q, internal) : + (this instanceof InterruptibleTask) && !internal ? status : + p.helpJoin(this, q, internal))) < 0)) ? s : + awaitDone(internal ? p : null, s, interruptible, deadline); + } + + /** + * Runs a task body: Unless done, calls exec and records status if + * completed, but doesn't wait for completion otherwise. + */ + final void doExec() { + if (status >= 0) { + boolean completed = false; try { - t.cancel(true); - } catch (Throwable ignore) { + completed = exec(); + } catch (Throwable rex) { + trySetException(rex); } + if (completed) + setDone(); } } + // Reporting Exceptions + /** * Returns a rethrowable exception for this task, if available. * To provide accurate stack traces, if the exception was not @@ -518,13 +527,19 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * still correct, although it may contain a misleading stack * trace. * + * @param asExecutionException true if wrap as ExecutionException * @return the exception, or null if none */ - private Throwable getThrowableException() { - Throwable ex; Aux a; - if ((a = aux) == null) - ex = null; - else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) { + private Throwable getException(boolean asExecutionException) { + int s; Throwable ex; Aux a; + if ((s = status) >= 0 || (s & ABNORMAL) == 0) + return null; + else if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) { + ex = new CancellationException(); + if (!asExecutionException || !(this instanceof InterruptibleTask)) + return ex; // else wrap below + } + else if (a.thread != Thread.currentThread()) { try { Constructor<?> noArgCtor = null, oneArgCtor = null; for (Constructor<?> c : ex.getClass().getConstructors()) { @@ -546,40 +561,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } catch (Exception ignore) { } } - return ex; - } - - /** - * Returns exception associated with the given status, or null if none. - */ - private Throwable getException(int s) { - Throwable ex = null; - if ((s & ABNORMAL) != 0 && (ex = getThrowableException()) == null) - ex = new CancellationException(); - return ex; - } - - /** - * Throws exception associated with the given status, or - * CancellationException if none recorded. - */ - private void reportException(int s) { - ForkJoinTask.<RuntimeException>uncheckedThrow(getThrowableException()); + return (asExecutionException) ? new ExecutionException(ex) : ex; } /** - * Throws exception for (timed or untimed) get, wrapping if - * necessary in an ExecutionException. + * Throws thrown exception, or CancellationException if none + * recorded. */ - private void reportExecutionException(int s) { - Throwable ex = null, rx; - if (s == ABNORMAL) - ex = new InterruptedException(); - else if (s >= 0) - ex = new TimeoutException(); - else if ((rx = getThrowableException()) != null) - ex = new ExecutionException(rx); - ForkJoinTask.<RuntimeException>uncheckedThrow(ex); + private void reportException(boolean asExecutionException) { + ForkJoinTask.<RuntimeException> + uncheckedThrow(getException(asExecutionException)); } /** @@ -603,9 +594,30 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { throw (T)t; // rely on vacuous cast } + // Utilities shared among ForkJoinTask, ForkJoinPool + + /** + * Sets MARKER bit, returning nonzero if previously set + */ + final int setForkJoinTaskStatusMarkerBit() { + return getAndBitwiseOrStatus(MARKER) & MARKER; + } + + /** + * Returns nonzero if MARKER bit set. + */ + final int getForkJoinTaskStatusMarkerBit() { + return status & MARKER; + } + // public methods /** + * Constructor for subclasses to call. + */ + public ForkJoinTask() {} + + /** * Arranges to asynchronously execute this task in the pool the * current task is running in, if applicable, or using the {@link * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While @@ -622,15 +634,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { */ public final ForkJoinTask<V> fork() { Thread t; ForkJoinWorkerThread wt; - ForkJoinPool p; ForkJoinPool.WorkQueue q; - U.storeStoreFence(); // ensure safely publishable - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - p = (wt = (ForkJoinWorkerThread)t).pool; - q = wt.workQueue; + ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal; + if (internal = + (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + q = (wt = (ForkJoinWorkerThread)t).workQueue; + p = wt.pool; } else - q = (p = ForkJoinPool.common).submissionQueue(false); - q.push(this, p, true); + q = (p = ForkJoinPool.common).externalSubmissionQueue(); + q.push(this, p, internal); return this; } @@ -647,10 +659,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { */ public final V join() { int s; - if ((s = status) >= 0) - s = awaitDone(s & POOLSUBMIT, 0L); - if ((s & ABNORMAL) != 0) - reportException(s); + if ((((s = status) < 0 ? s : awaitDone(false, 0L)) & ABNORMAL) != 0) + reportException(false); return getRawResult(); } @@ -663,12 +673,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return the computed result */ public final V invoke() { - int s; - if ((s = doExec()) >= 0) - s = awaitDone(RAN, 0L); - if ((s & ABNORMAL) != 0) - reportException(s); - return getRawResult(); + doExec(); + return join(); } /** @@ -693,18 +699,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { if (t1 == null || t2 == null) throw new NullPointerException(); t2.fork(); - if ((s1 = t1.doExec()) >= 0) - s1 = t1.awaitDone(RAN, 0L); - if ((s1 & ABNORMAL) != 0) { - cancelIgnoringExceptions(t2); - t1.reportException(s1); - } - else { - if ((s2 = t2.status) >= 0) - s2 = t2.awaitDone(0, 0L); - if ((s2 & ABNORMAL) != 0) - t2.reportException(s2); + t1.doExec(); + if ((((s1 = t1.status) < 0 ? s1 : + t1.awaitDone(false, 0L)) & ABNORMAL) != 0) { + t2.cancel(false); + t1.reportException(false); } + else if ((((s2 = t2.status) < 0 ? s2 : + t2.awaitDone(false, 0L)) & ABNORMAL) != 0) + t2.reportException(false); } /** @@ -726,36 +729,36 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { - ForkJoinTask<?> t; + ForkJoinTask<?> t; int s; if ((t = tasks[i]) == null) { ex = new NullPointerException(); break; } if (i == 0) { - int s; - if ((s = t.doExec()) >= 0) - s = t.awaitDone(RAN, 0L); - if ((s & ABNORMAL) != 0) - ex = t.getException(s); + t.doExec(); + if ((((s = t.status) < 0 ? s : + t.awaitDone(false, 0L)) & ABNORMAL) != 0) + ex = t.getException(false); break; } t.fork(); } if (ex == null) { for (int i = 1; i <= last; ++i) { - ForkJoinTask<?> t; - if ((t = tasks[i]) != null) { - int s; - if ((s = t.status) >= 0) - s = t.awaitDone(0, 0L); - if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) - break; - } + ForkJoinTask<?> t; int s; + if ((t = tasks[i]) != null && + ((((s = t.status) < 0 ? s : + t.awaitDone(false, 0L)) & ABNORMAL) != 0) && + (ex = t.getException(false)) != null) + break; } } if (ex != null) { - for (int i = 1; i <= last; ++i) - cancelIgnoringExceptions(tasks[i]); + for (int i = 1; i <= last; ++i) { + ForkJoinTask<?> t; + if ((t = tasks[i]) != null) + t.cancel(false); + } rethrow(ex); } } @@ -789,36 +792,36 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { Throwable ex = null; int last = ts.size() - 1; // nearly same as array version for (int i = last; i >= 0; --i) { - ForkJoinTask<?> t; + ForkJoinTask<?> t; int s; if ((t = ts.get(i)) == null) { ex = new NullPointerException(); break; } if (i == 0) { - int s; - if ((s = t.doExec()) >= 0) - s = t.awaitDone(RAN, 0L); - if ((s & ABNORMAL) != 0) - ex = t.getException(s); + t.doExec(); + if ((((s = t.status) < 0 ? s : + t.awaitDone(false, 0L)) & ABNORMAL) != 0) + ex = t.getException(false); break; } t.fork(); } if (ex == null) { for (int i = 1; i <= last; ++i) { - ForkJoinTask<?> t; - if ((t = ts.get(i)) != null) { - int s; - if ((s = t.status) >= 0) - s = t.awaitDone(0, 0L); - if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) - break; - } + ForkJoinTask<?> t; int s; + if ((t = ts.get(i)) != null && + ((((s = t.status) < 0 ? s : + t.awaitDone(false, 0L)) & ABNORMAL) != 0) && + (ex = t.getException(false)) != null) + break; } } if (ex != null) { - for (int i = 1; i <= last; ++i) - cancelIgnoringExceptions(ts.get(i)); + for (int i = 1; i <= last; ++i) { + ForkJoinTask<?> t; + if ((t = ts.get(i)) != null) + t.cancel(false); + } rethrow(ex); } return tasks; @@ -852,7 +855,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { - return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL; + int s = trySetCancelled(); + return (s >= 0 || (s & (ABNORMAL | THROWN)) == ABNORMAL); } public final boolean isDone() { @@ -894,16 +898,25 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { @Override public V resultNow() { - if (!isCompletedNormally()) - throw new IllegalStateException(); + int s = status; + if ((s & DONE) == 0) + throw new IllegalStateException("Task has not completed"); + if ((s & ABNORMAL) != 0) { + if ((s & THROWN) != 0) + throw new IllegalStateException("Task completed with exception"); + else + throw new IllegalStateException("Task was cancelled"); + } return getRawResult(); } @Override public Throwable exceptionNow() { - if ((status & (ABNORMAL | THROWN)) != (ABNORMAL | THROWN)) + Throwable ex; + if ((status & HAVE_EXCEPTION) != HAVE_EXCEPTION || + (ex = getException(false)) == null) throw new IllegalStateException(); - return getThrowableException(); + return ex; } /** @@ -914,7 +927,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return the exception, or {@code null} if none */ public final Throwable getException() { - return getException(status); + return getException(false); } /** @@ -984,13 +997,14 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { - int s; - if (Thread.interrupted()) - s = ABNORMAL; - else if ((s = status) >= 0) - s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE, 0L); - if ((s & ABNORMAL) != 0) - reportExecutionException(s); + int stat = status; + int s = ((stat < 0) ? stat : + (Thread.interrupted()) ? ABNORMAL : + awaitDone(true, 0L)); + if (s == ABNORMAL) + throw new InterruptedException(); + else if ((s & ABNORMAL) != 0) + reportException(true); return getRawResult(); } @@ -1011,14 +1025,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long nanos = unit.toNanos(timeout); - int s; - if (Thread.interrupted()) - s = ABNORMAL; - else if ((s = status) >= 0 && nanos > 0L) - s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED, - nanos + System.nanoTime()); - if (s >= 0 || (s & ABNORMAL) != 0) - reportExecutionException(s); + int stat = status; + int s = ((stat < 0) ? stat : + (Thread.interrupted()) ? ABNORMAL : + (nanos <= 0L) ? 0 : + awaitDone(true, (System.nanoTime() + nanos) | 1L)); + if (s == ABNORMAL) + throw new InterruptedException(); + else if (s >= 0) + throw new TimeoutException(); + else if ((s & ABNORMAL) != 0) + reportException(true); return getRawResult(); } @@ -1029,9 +1046,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * known to have aborted. */ public final void quietlyJoin() { - int s; - if ((s = status) >= 0) - awaitDone(s & POOLSUBMIT, 0L); + if (status >= 0) + awaitDone(false, 0L); } /** @@ -1040,9 +1056,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * exception. */ public final void quietlyInvoke() { - int s; - if ((s = doExec()) >= 0) - awaitDone(RAN, 0L); + doExec(); + if (status >= 0) + awaitDone(false, 0L); } /** @@ -1059,17 +1075,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { */ public final boolean quietlyJoin(long timeout, TimeUnit unit) throws InterruptedException { - int s; long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - s = ABNORMAL; - else if ((s = status) >= 0 && nanos > 0L) - s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED, - nanos + System.nanoTime()); + int stat = status; + int s = ((stat < 0) ? stat : + (Thread.interrupted()) ? ABNORMAL : + (nanos <= 0L) ? 0 : + awaitDone(true, (System.nanoTime() + nanos) | 1L)); if (s == ABNORMAL) throw new InterruptedException(); - else - return (s < 0); + return (s < 0); } /** @@ -1086,11 +1100,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { int s; long nanos = unit.toNanos(timeout); if ((s = status) >= 0 && nanos > 0L) - s = awaitDone((s & POOLSUBMIT) | TIMED, nanos + System.nanoTime()); + s = awaitDone(false, (System.nanoTime() + nanos) | 1L); return (s < 0); } /** + * Utility for possibly-timed ForkJoinPool.invokeAll + */ + final void quietlyJoinPoolInvokeAllTask(long deadline) + throws InterruptedException { + int s; + if ((s = status) >= 0) { + if (Thread.interrupted()) + s = ABNORMAL; + else if (deadline == 0L || deadline - System.nanoTime() > 0L) + s = awaitDone(true, deadline); + if (s == ABNORMAL) + throw new InterruptedException(); + else if (s >= 0) + cancel(true); + } + } + + /** * Possibly executes tasks until the pool hosting the current task * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This * method may be of use in designs in which many tasks are forked, @@ -1160,12 +1192,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { * @return {@code true} if unforked */ public boolean tryUnfork() { - Thread t; ForkJoinPool.WorkQueue q; boolean owned; - if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + Thread t; ForkJoinPool.WorkQueue q; boolean internal; + if (internal = + (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else q = ForkJoinPool.commonQueue(); - return (q != null && q.tryUnpush(this, owned)); + return (q != null && q.tryUnpush(this, internal)); } /** @@ -1361,6 +1394,147 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } } + // Factory methods for adaptors below + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code run} + * method of the given {@code Runnable} as its action, and returns + * a null result upon {@link #join}. + * + * @param runnable the runnable action + * @return the task + */ + public static ForkJoinTask<?> adapt(Runnable runnable) { + return new AdaptedRunnableAction(runnable); + } + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code run} + * method of the given {@code Runnable} as its action, and returns + * the given result upon {@link #join}. + * + * @param runnable the runnable action + * @param result the result upon completion + * @param <T> the type of the result + * @return the task + */ + public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { + return new AdaptedRunnable<T>(runnable, result); + } + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code call} + * method of the given {@code Callable} as its action, and returns + * its result upon {@link #join}, translating any checked exceptions + * encountered into {@code RuntimeException}. + * + * @param callable the callable action + * @param <T> the type of the callable's result + * @return the task + */ + public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { + return new AdaptedCallable<T>(callable); + } + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code call} + * method of the given {@code Callable} as its action, and returns + * its result upon {@link #join}, translating any checked exceptions + * encountered into {@code RuntimeException}. Additionally, + * invocations of {@code cancel} with {@code mayInterruptIfRunning + * true} will attempt to interrupt the thread performing the task. + * + * @param callable the callable action + * @param <T> the type of the callable's result + * @return the task + * + * @since 19 + */ + public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) { + return new AdaptedInterruptibleCallable<T>(callable); + } + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code run} + * method of the given {@code Runnable} as its action, and returns + * the given result upon {@link #join}, translating any checked exceptions + * encountered into {@code RuntimeException}. Additionally, + * invocations of {@code cancel} with {@code mayInterruptIfRunning + * true} will attempt to interrupt the thread performing the task. + * + * @param runnable the runnable action + * @param result the result upon completion + * @param <T> the type of the result + * @return the task + * + * @since 22 + */ + public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result) { + return new AdaptedInterruptibleRunnable<T>(runnable, result); + } + + /** + * Returns a new {@code ForkJoinTask} that performs the {@code + * run} method of the given {@code Runnable} as its action, and + * returns null upon {@link #join}, translating any checked + * exceptions encountered into {@code RuntimeException}. + * Additionally, invocations of {@code cancel} with {@code + * mayInterruptIfRunning true} will attempt to interrupt the + * thread performing the task. + * + * @param runnable the runnable action + * @return the task + * + * @since 22 + */ + public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) { + return new AdaptedInterruptibleRunnable<Void>(runnable, null); + } + + // Serialization support + + private static final long serialVersionUID = -7721805057305804111L; + + /** + * Saves this task to a stream (that is, serializes it). + * + * @param s the stream + * @throws java.io.IOException if an I/O error occurs + * @serialData the current run status and the exception thrown + * during execution, or {@code null} if none + */ + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + Aux a; + s.defaultWriteObject(); + s.writeObject((a = aux) == null ? null : a.ex); + } + + /** + * Reconstitutes this task from a stream (that is, deserializes it). + * @param s the stream + * @throws ClassNotFoundException if the class of a serialized object + * could not be found + * @throws java.io.IOException if an I/O error occurs + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + Object ex = s.readObject(); + if (ex != null) + aux = new Aux(Thread.currentThread(), (Throwable)ex); + } + + static { + U = Unsafe.getUnsafe(); + STATUS = U.objectFieldOffset(ForkJoinTask.class, "status"); + AUX = U.objectFieldOffset(ForkJoinTask.class, "aux"); + Class<?> dep1 = LockSupport.class; // ensure loaded + Class<?> dep2 = Aux.class; + } + + // Special subclasses for adaptors and internal tasks + /** * Adapter for Runnables. This implements RunnableFuture * to be compliant with AbstractExecutorService constraints @@ -1373,7 +1547,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { @SuppressWarnings("serial") // Conditionally serializable T result; AdaptedRunnable(Runnable runnable, T result) { - if (runnable == null) throw new NullPointerException(); + Objects.requireNonNull(runnable); this.runnable = runnable; this.result = result; // OK to set this even before completion } @@ -1395,7 +1569,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { @SuppressWarnings("serial") // Conditionally serializable final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); + Objects.requireNonNull(runnable); this.runnable = runnable; } public final Void getRawResult() { return null; } @@ -1409,34 +1583,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { } /** - * Adapter for Runnables in which failure forces worker exception. - */ - static final class RunnableExecuteAction extends ForkJoinTask<Void> { - @SuppressWarnings("serial") // Conditionally serializable - final Runnable runnable; - RunnableExecuteAction(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); - this.runnable = runnable; - } - public final Void getRawResult() { return null; } - public final void setRawResult(Void v) { } - public final boolean exec() { runnable.run(); return true; } - int trySetException(Throwable ex) { // if a handler, invoke it - int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h; - if (isExceptionalStatus(s = trySetThrown(ex)) && - (h = ((t = Thread.currentThread()). - getUncaughtExceptionHandler())) != null) { - try { - h.uncaughtException(t, ex); - } catch (Throwable ignore) { - } - } - return s; - } - private static final long serialVersionUID = 5232453952276885070L; - } - - /** * Adapter for Callables. */ static final class AdaptedCallable<T> extends ForkJoinTask<T> @@ -1446,7 +1592,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { @SuppressWarnings("serial") // Conditionally serializable T result; AdaptedCallable(Callable<? extends T> callable) { - if (callable == null) throw new NullPointerException(); + Objects.requireNonNull(callable); this.callable = callable; } public final T getRawResult() { return result; } @@ -1468,152 +1614,238 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable { private static final long serialVersionUID = 2838392045355241008L; } - static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T> + /** + * Tasks with semantics conforming to ExecutorService conventions. + * Tasks are interruptible when cancelled, including cases of + * cancellation upon pool termination. In addition to recording + * the running thread to enable interrupt in cancel(true), the + * task checks for termination before executing the compute + * method, to cover shutdown races in which the task has not yet + * been cancelled on entry and might not otherwise be cancelled by + * others. + */ + static abstract class InterruptibleTask<T> extends ForkJoinTask<T> implements RunnableFuture<T> { - @SuppressWarnings("serial") // Conditionally serializable - final Callable<? extends T> callable; transient volatile Thread runner; - @SuppressWarnings("serial") // Conditionally serializable - T result; - AdaptedInterruptibleCallable(Callable<? extends T> callable) { - if (callable == null) throw new NullPointerException(); - this.callable = callable; - } - public final T getRawResult() { return result; } - public final void setRawResult(T v) { result = v; } + abstract T compute() throws Exception; public final boolean exec() { Thread.interrupted(); - runner = Thread.currentThread(); + Thread t = runner = Thread.currentThread(); try { - if (!isDone()) // recheck - result = callable.call(); - return true; - } catch (RuntimeException rex) { - throw rex; - } catch (Exception ex) { - throw new RuntimeException(ex); + if ((t instanceof ForkJoinWorkerThread) && + ForkJoinPool.poolIsStopping(((ForkJoinWorkerThread)t).pool)) + cancel(true); + else { + try { + if (status >= 0) + setRawResult(compute()); + } catch (Exception ex) { + trySetException(ex); + } + } } finally { runner = null; - Thread.interrupted(); } + return true; } - public final void run() { invoke(); } - public final boolean cancel(boolean mayInterruptIfRunning) { + public boolean cancel(boolean mayInterruptIfRunning) { Thread t; - boolean stat = super.cancel(false); - if (mayInterruptIfRunning && (t = runner) != null) { - try { - t.interrupt(); - } catch (Throwable ignore) { + if (trySetCancelled() >= 0) { + if (mayInterruptIfRunning && (t = runner) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } } + return true; } - return stat; + return isCancelled(); } + public final void run() { quietlyInvoke(); } + Object adaptee() { return null; } // for printing and diagnostics public String toString() { - return super.toString() + "[Wrapped task = " + callable + "]"; + Object a = adaptee(); + String s = super.toString(); + return ((a == null) ? s : + (s + "[Wrapped task = " + a.toString() + "]")); } private static final long serialVersionUID = 2838392045355241008L; } /** - * Returns a new {@code ForkJoinTask} that performs the {@code run} - * method of the given {@code Runnable} as its action, and returns - * a null result upon {@link #join}. - * - * @param runnable the runnable action - * @return the task + * Adapter for Callable-based interruptible tasks. */ - public static ForkJoinTask<?> adapt(Runnable runnable) { - return new AdaptedRunnableAction(runnable); + static final class AdaptedInterruptibleCallable<T> extends InterruptibleTask<T> { + @SuppressWarnings("serial") // Conditionally serializable + final Callable<? extends T> callable; + @SuppressWarnings("serial") // Conditionally serializable + T result; + AdaptedInterruptibleCallable(Callable<? extends T> callable) { + Objects.requireNonNull(callable); + this.callable = callable; + } + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + final T compute() throws Exception { return callable.call(); } + final Object adaptee() { return callable; } + private static final long serialVersionUID = 2838392045355241008L; } /** - * Returns a new {@code ForkJoinTask} that performs the {@code run} - * method of the given {@code Runnable} as its action, and returns - * the given result upon {@link #join}. - * - * @param runnable the runnable action - * @param result the result upon completion - * @param <T> the type of the result - * @return the task + * Adapter for Runnable-based interruptible tasks. */ - public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) { - return new AdaptedRunnable<T>(runnable, result); + static final class AdaptedInterruptibleRunnable<T> extends InterruptibleTask<T> { + @SuppressWarnings("serial") // Conditionally serializable + final Runnable runnable; + @SuppressWarnings("serial") // Conditionally serializable + final T result; + AdaptedInterruptibleRunnable(Runnable runnable, T result) { + Objects.requireNonNull(runnable); + this.runnable = runnable; + this.result = result; + } + public final T getRawResult() { return result; } + public final void setRawResult(T v) { } + final T compute() { runnable.run(); return result; } + final Object adaptee() { return runnable; } + private static final long serialVersionUID = 2838392045355241008L; } /** - * Returns a new {@code ForkJoinTask} that performs the {@code call} - * method of the given {@code Callable} as its action, and returns - * its result upon {@link #join}, translating any checked exceptions - * encountered into {@code RuntimeException}. - * - * @param callable the callable action - * @param <T> the type of the callable's result - * @return the task + * Adapter for Runnables in which failure forces worker exception. */ - public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { - return new AdaptedCallable<T>(callable); + static final class RunnableExecuteAction extends InterruptibleTask<Void> { + @SuppressWarnings("serial") // Conditionally serializable + final Runnable runnable; + RunnableExecuteAction(Runnable runnable) { + Objects.requireNonNull(runnable); + this.runnable = runnable; + } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + final Void compute() { runnable.run(); return null; } + final Object adaptee() { return runnable; } + void onAuxExceptionSet(Throwable ex) { // if a handler, invoke it + Thread t; java.lang.Thread.UncaughtExceptionHandler h; + if ((h = ((t = Thread.currentThread()). + getUncaughtExceptionHandler())) != null) { + try { + h.uncaughtException(t, ex); + } catch (Throwable ignore) { + } + } + } + private static final long serialVersionUID = 5232453952276885070L; } /** - * Returns a new {@code ForkJoinTask} that performs the {@code call} - * method of the given {@code Callable} as its action, and returns - * its result upon {@link #join}, translating any checked exceptions - * encountered into {@code RuntimeException}. Additionally, - * invocations of {@code cancel} with {@code mayInterruptIfRunning - * true} will attempt to interrupt the thread performing the task. - * - * @param callable the callable action - * @param <T> the type of the callable's result - * @return the task - * - * @since 19 - */ - public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) { - // https://bugs.openjdk.org/browse/JDK-8246587 - return new AdaptedInterruptibleCallable<T>(callable); - } - - // Serialization support - - private static final long serialVersionUID = -7721805057305804111L; + * Task (that is never forked) to hold results for + * ForkJoinPool.invokeAny, or to report exception if all subtasks + * fail or are cancelled or the pool is terminating. Both + * InvokeAnyRoot and InvokeAnyTask objects exist only transiently + * during invokeAny invocations, so serialization support would be + * nonsensical and is omitted. + */ + @SuppressWarnings("serial") + static final class InvokeAnyRoot<T> extends InterruptibleTask<T> { + volatile T result; + volatile int count; // number of tasks; decremented in case all tasks fail + InvokeAnyRoot() { } + final void tryComplete(InvokeAnyTask<T> f, T v, Throwable ex, + boolean completed) { + if (f != null && !isDone()) { + if (ForkJoinPool.poolIsStopping(getPool())) + trySetCancelled(); + else if (f.setForkJoinTaskStatusMarkerBit() == 0) { + if (completed) { + result = v; + quietlyComplete(); + } + else if (U.getAndAddInt(this, COUNT, -1) <= 1) { + if (ex == null) + trySetCancelled(); + else + trySetException(ex); + } + } + } + } + public final T compute() { return null; } // never forked + public final T getRawResult() { return result; } + public final void setRawResult(T v) { } + + // Common support for timed and untimed versions of invokeAny + final T invokeAny(Collection<? extends Callable<T>> tasks, + ForkJoinPool pool, boolean timed, long nanos) + throws InterruptedException, ExecutionException, TimeoutException { + if ((count = tasks.size()) <= 0) + throw new IllegalArgumentException(); + if (pool == null) + throw new NullPointerException(); + InvokeAnyTask<T> t = null; // list of submitted tasks + try { + for (Callable<T> c : tasks) + pool.execute((ForkJoinTask<?>) + (t = new InvokeAnyTask<T>(c, this, t))); + return timed ? get(nanos, TimeUnit.NANOSECONDS) : get(); + } finally { + for (; t != null; t = t.pred) + t.onRootCompletion(); + } + } - /** - * Saves this task to a stream (that is, serializes it). - * - * @param s the stream - * @throws java.io.IOException if an I/O error occurs - * @serialData the current run status and the exception thrown - * during execution, or {@code null} if none - */ - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - Aux a; - s.defaultWriteObject(); - s.writeObject((a = aux) == null ? null : a.ex); + private static final Unsafe U; + private static final long COUNT; + static { + U = Unsafe.getUnsafe(); + COUNT = U.objectFieldOffset(InvokeAnyRoot.class, "count"); + } } /** - * Reconstitutes this task from a stream (that is, deserializes it). - * @param s the stream - * @throws ClassNotFoundException if the class of a serialized object - * could not be found - * @throws java.io.IOException if an I/O error occurs + * Task with results in InvokeAnyRoot (and never independently + * joined). */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - Object ex = s.readObject(); - if (ex != null) - trySetThrown((Throwable)ex); - } - - static { - U = Unsafe.getUnsafe(); - STATUS = U.objectFieldOffset(ForkJoinTask.class, "status"); - AUX = U.objectFieldOffset(ForkJoinTask.class, "aux"); - Class<?> dep1 = LockSupport.class; // ensure loaded - Class<?> dep2 = Aux.class; + @SuppressWarnings("serial") + static final class InvokeAnyTask<T> extends InterruptibleTask<Void> { + final Callable<? extends T> callable; + final InvokeAnyRoot<T> root; + final InvokeAnyTask<T> pred; // to traverse on cancellation + InvokeAnyTask(Callable<T> callable, InvokeAnyRoot<T> root, + InvokeAnyTask<T> pred) { + Objects.requireNonNull(callable); + this.callable = callable; + this.root = root; + this.pred = pred; + } + final Void compute() throws Exception { + InvokeAnyRoot<T> r = root; + T v = null; Throwable ex = null; boolean completed = false; + if (r != null && !r.isDone()) { + try { + v = callable.call(); + completed = true; + } catch (Exception rex) { + ex = rex; + } finally { + r.tryComplete(this, v, ex, completed); + } + } + return null; + } + public final boolean cancel(boolean mayInterruptIfRunning) { + InvokeAnyRoot<T> r; + boolean stat = super.cancel(mayInterruptIfRunning); + if ((r = root) != null) + r.tryComplete(this, null, null, false); + return stat; + } + final void onRootCompletion() { + if (!isDone()) + super.cancel(true); // no need for tryComplete + } + public final Void getRawResult() { return null; } + public final void setRawResult(Void v) { } + final Object adaptee() { return callable; } } - } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java index f7281998251..c4fc3de9750 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java @@ -76,9 +76,8 @@ public class ForkJoinWorkerThread extends Thread { boolean clearThreadLocals) { super(group, null, pool.nextWorkerThreadName(), 0L, !clearThreadLocals); UncaughtExceptionHandler handler = (this.pool = pool).ueh; - this.workQueue = new ForkJoinPool.WorkQueue(this, 0); - if (clearThreadLocals) - workQueue.setClearThreadLocals(); + this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config, + clearThreadLocals); super.setDaemon(true); if (handler != null) super.setUncaughtExceptionHandler(handler); diff --git a/test/jdk/ProblemList.txt b/test/jdk/ProblemList.txt index ec34e1f461b..8ab03ce62fb 100644 --- a/test/jdk/ProblemList.txt +++ b/test/jdk/ProblemList.txt @@ -723,8 +723,9 @@ com/sun/jdi/InvokeHangTest.java 8218463 linux-al java/util/Locale/LocaleProvidersRun.java 8268379 macosx-x64 sun/util/locale/provider/CalendarDataRegression.java 8268379 macosx-x64 -java/util/concurrent/forkjoin/AsyncShutdownNow.java 8286352 linux-all,windows-x64 -java/util/concurrent/ExecutorService/CloseTest.java 8288899 macosx-aarch64 +java/util/concurrent/SynchronousQueue/Fairness.java 8300663 generic-x64 +java/util/Locale/LocaleProvidersRun.java 8268379 macosx-x64 +sun/util/locale/provider/CalendarDataRegression.java 8268379 macosx-x64 ############################################################################ diff --git a/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java b/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java index 193a4677354..162d4f7d1b4 100644 --- a/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java +++ b/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java @@ -23,9 +23,9 @@ /* * @test - * @summary Test ExecutorService.close, including default implementation + * @summary Test implementations of ExecutorService.close * @library ../lib - * @run testng CloseTest + * @run junit CloseTest */ import java.time.Duration; @@ -37,37 +37,32 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; -import static org.testng.Assert.*; - -public class CloseTest { - - @DataProvider(name = "executors") - public Object[][] executors() { - var defaultThreadFactory = Executors.defaultThreadFactory(); - var virtualThreadFactory = Thread.ofVirtual().factory(); - return new Object[][] { - // ensures that default close method is tested - { new DelegatingExecutorService(Executors.newCachedThreadPool()), }, - - // implementations that may override close - { new ForkJoinPool(), }, - { Executors.newFixedThreadPool(1), }, - { Executors.newCachedThreadPool(), }, - { Executors.newThreadPerTaskExecutor(defaultThreadFactory), }, - { Executors.newThreadPerTaskExecutor(virtualThreadFactory), }, - }; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.jupiter.api.Assertions.*; + +class CloseTest { + + static Stream<ExecutorService> executors() { + return Stream.of( + // ensures that default close method is tested + new DelegatingExecutorService(Executors.newCachedThreadPool()), + + // implementations that may override close + Executors.newCachedThreadPool(), + Executors.newVirtualThreadPerTaskExecutor(), + new ForkJoinPool() + ); } /** * Test close with no tasks running. */ - @Test(dataProvider = "executors") - public void testCloseWithNoTasks(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testCloseWithNoTasks(ExecutorService executor) throws Exception { executor.close(); assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); @@ -77,24 +72,109 @@ public class CloseTest { /** * Test close with tasks running. */ - @Test(dataProvider = "executors") - public void testCloseWithRunningTasks(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testCloseWithRunningTasks(ExecutorService executor) throws Exception { + Phaser phaser = new Phaser(2); Future<?> future = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); Thread.sleep(Duration.ofMillis(100)); return "foo"; }); + phaser.arriveAndAwaitAdvance(); // wait for task to start + executor.close(); // waits for task to complete + assertFalse(Thread.interrupted()); assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS)); - assertEquals(future.resultNow(), "foo"); + assertEquals("foo", future.resultNow()); + } + + /** + * Test shutdown with tasks running. + */ + @ParameterizedTest + @MethodSource("executors") + void testShutdownWithRunningTasks(ExecutorService executor) throws Exception { + Phaser phaser = new Phaser(2); + Future<?> future = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); + Thread.sleep(Duration.ofMillis(100)); + return "foo"; + }); + phaser.arriveAndAwaitAdvance(); // wait for task to start + + executor.shutdown(); + assertFalse(Thread.interrupted()); + assertTrue(executor.isShutdown()); + assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(executor.isTerminated()); + assertEquals("foo", future.resultNow()); + } + + /** + * Test close with multiple tasks running + */ + @ParameterizedTest + @MethodSource("executors") + void testCloseWith2RunningTasks(ExecutorService executor) throws Exception { + Phaser phaser = new Phaser(3); + Future<?> f1 = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); + Thread.sleep(Duration.ofMillis(100)); + return "foo"; + }); + Future<?> f2 = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); + Thread.sleep(Duration.ofMillis(100)); + return "bar"; + }); + phaser.arriveAndAwaitAdvance(); // wait for tasks to start + + executor.close(); // waits for task to complete + assertFalse(Thread.interrupted()); + assertTrue(executor.isShutdown()); + assertTrue(executor.isTerminated()); + assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS)); + assertEquals("foo", f1.resultNow()); + assertEquals("bar", f2.resultNow()); + } + + /** + * Test shutdown with multiple tasks running + */ + @ParameterizedTest + @MethodSource("executors") + void testShutdownWith2RunningTasks(ExecutorService executor) throws Exception { + Phaser phaser = new Phaser(3); + Future<?> f1 = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); + Thread.sleep(Duration.ofMillis(100)); + return "foo"; + }); + Future<?> f2 = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); + Thread.sleep(Duration.ofMillis(100)); + return "bar"; + }); + phaser.arriveAndAwaitAdvance(); // wait for tasks to start + + executor.shutdown(); + assertFalse(Thread.interrupted()); + assertTrue(executor.isShutdown()); + assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(executor.isTerminated()); + assertEquals("foo", f1.resultNow()); + assertEquals("bar", f2.resultNow()); } /** * Test close when executor is shutdown but not terminated. */ - @Test(dataProvider = "executors") - public void testShutdownBeforeClose(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testShutdownBeforeClose(ExecutorService executor) throws Exception { Phaser phaser = new Phaser(2); Future<?> future = executor.submit(() -> { phaser.arriveAndAwaitAdvance(); @@ -104,22 +184,22 @@ public class CloseTest { phaser.arriveAndAwaitAdvance(); // wait for task to start executor.shutdown(); // shutdown, will not immediately terminate - executor.close(); assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS)); - assertEquals(future.resultNow(), "foo"); + Object s = future.resultNow(); + assertEquals("foo", s); } /** * Test close when terminated. */ - @Test(dataProvider = "executors") - public void testTerminateBeforeClose(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testTerminateBeforeClose(ExecutorService executor) throws Exception { executor.shutdown(); assertTrue(executor.isTerminated()); - executor.close(); assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); @@ -129,8 +209,9 @@ public class CloseTest { /** * Test invoking close with interrupt status set. */ - @Test(dataProvider = "executors") - public void testInterruptBeforeClose(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testInterruptBeforeClose(ExecutorService executor) throws Exception { Phaser phaser = new Phaser(2); Future<?> future = executor.submit(() -> { phaser.arriveAndAwaitAdvance(); @@ -149,21 +230,29 @@ public class CloseTest { assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS)); - expectThrows(ExecutionException.class, future::get); + assertThrows(ExecutionException.class, future::get); } /** * Test interrupting thread blocked in close. */ - @Test(dataProvider = "executors") - public void testInterruptDuringClose(ExecutorService executor) throws Exception { + @ParameterizedTest + @MethodSource("executors") + void testInterruptDuringClose(ExecutorService executor) throws Exception { + Phaser phaser = new Phaser(2); Future<?> future = executor.submit(() -> { + phaser.arriveAndAwaitAdvance(); Thread.sleep(Duration.ofDays(1)); return null; }); + phaser.arriveAndAwaitAdvance(); // wait for task to start + + // schedule main thread to be interrupted Thread thread = Thread.currentThread(); new Thread(() -> { - try { Thread.sleep( Duration.ofMillis(500)); } catch (Exception ignore) { } + try { + Thread.sleep( Duration.ofMillis(100)); + } catch (Exception ignore) { } thread.interrupt(); }).start(); try { @@ -175,6 +264,6 @@ public class CloseTest { assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS)); - expectThrows(ExecutionException.class, future::get); + assertThrows(ExecutionException.class, future::get); } } diff --git a/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java b/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java new file mode 100644 index 00000000000..a9a4eb0d8d7 --- /dev/null +++ b/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java @@ -0,0 +1,787 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @summary Test implementations of ExecutorService.invokeAll/invokeAny + * @run junit InvokeTest + */ + +import java.time.Duration; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import static java.lang.Thread.State.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.jupiter.api.Assertions.*; + +class InvokeTest { + + private static ScheduledExecutorService scheduler; + + @BeforeAll + static void setup() throws Exception { + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @AfterAll + static void shutdown() { + scheduler.shutdown(); + } + + private static Stream<ExecutorService> executors() { + return Stream.of( + Executors.newCachedThreadPool(), + Executors.newVirtualThreadPerTaskExecutor(), + new ForkJoinPool() + ); + } + + /** + * Test invokeAny where all tasks complete normally. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny1(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> "bar"; + String result = executor.invokeAny(List.of(task1, task2)); + assertTrue(Set.of("foo", "bar").contains(result)); + } + } + + /** + * Test invokeAny where all tasks complete normally. The completion of the + * first task should cancel remaining tasks. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny2(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + String result = executor.invokeAny(List.of(task1, task2)); + assertEquals("foo", result); + + // if task2 started then it should have been interrupted + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test invokeAny where all tasks complete with exception. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny3(ExecutorService executor) throws Exception { + try (executor) { + class FooException extends Exception { } + Callable<String> task1 = () -> { throw new FooException(); }; + Callable<String> task2 = () -> { throw new FooException(); }; + try { + executor.invokeAny(List.of(task1, task2)); + fail("invokeAny did not throw"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue(cause instanceof FooException); + } + } + } + + /** + * Test invokeAny where all tasks complete with exception. The completion + * of the last task is delayed. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny4(ExecutorService executor) throws Exception { + try (executor) { + class FooException extends Exception { } + Callable<String> task1 = () -> { throw new FooException(); }; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMillis(50)); + throw new FooException(); + }; + try { + executor.invokeAny(List.of(task1, task2)); + fail("invokeAny did not throw"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + assertTrue(cause instanceof FooException); + } + } + } + + /** + * Test invokeAny where some, not all, tasks complete normally. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny5(ExecutorService executor) throws Exception { + try (executor) { + class FooException extends Exception { } + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> { throw new FooException(); }; + String result = executor.invokeAny(List.of(task1, task2)); + assertEquals("foo", result); + } + } + + /** + * Test invokeAny where some, not all, tasks complete normally. The first + * task to complete normally is delayed. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAny6(ExecutorService executor) throws Exception { + try (executor) { + class FooException extends Exception { } + Callable<String> task1 = () -> { + Thread.sleep(Duration.ofMillis(50)); + return "foo"; + }; + Callable<String> task2 = () -> { throw new FooException(); }; + String result = executor.invokeAny(List.of(task1, task2)); + assertEquals("foo", result); + } + } + + /** + * Test timed-invokeAny where all tasks complete normally before the timeout. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyWithTimeout1(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> "bar"; + String result = executor.invokeAny(List.of(task1, task2), 1, TimeUnit.MINUTES); + assertTrue(Set.of("foo", "bar").contains(result)); + } + } + + /** + * Test timed-invokeAny where one task completes normally before the timeout. + * The remaining tests should be cancelled. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyWithTimeout2(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + String result = executor.invokeAny(List.of(task1, task2), 1, TimeUnit.MINUTES); + assertEquals("foo", result); + + // if task2 started then it should have been interrupted + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test timed-invokeAny where timeout expires before any task completes. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyWithTimeout3(ExecutorService executor) throws Exception { + try (executor) { + var task1Started = new AtomicBoolean(); + var task1Interrupted = new CountDownLatch(1); + Callable<String> task1 = () -> { + task1Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task1Interrupted.countDown(); + } + return null; + }; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + // invokeAny should throw TimeoutException + assertThrows(TimeoutException.class, + () -> executor.invokeAny(List.of(task1, task2), 100, TimeUnit.MILLISECONDS)); + + // tasks that started should be interrupted + if (task1Started.get()) { + task1Interrupted.await(); + } + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test invokeAny with interrupt status set. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyWithInterruptSet(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> { + Thread.sleep(Duration.ofMinutes(1)); + return "foo"; + }; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMinutes(1)); + return "bar"; + }; + Thread.currentThread().interrupt(); + try { + executor.invokeAny(List.of(task1, task2)); + fail("invokeAny did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + } + } + + /** + * Test interrupting a thread blocked in invokeAny. + */ + @ParameterizedTest + @MethodSource("executors") + void testInterruptInvokeAny(ExecutorService executor) throws Exception { + try (executor) { + var task1Started = new AtomicBoolean(); + var task1Interrupted = new CountDownLatch(1); + Callable<String> task1 = () -> { + task1Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task1Interrupted.countDown(); + } + return null; + }; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + scheduleInterruptAt("invokeAny"); + try { + executor.invokeAny(List.of(task1, task2)); + fail("invokeAny did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + + // tasks that started should be interrupted + if (task1Started.get()) { + task1Interrupted.await(); + } + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test invokeAny after ExecutorService has been shutdown. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyAfterShutdown(ExecutorService executor) throws Exception { + executor.shutdown(); + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> "bar"; + assertThrows(RejectedExecutionException.class, + () -> executor.invokeAny(List.of(task1, task2))); + } + + /** + * Test invokeAny with empty collection. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyEmpty1(ExecutorService executor) throws Exception { + try (executor) { + assertThrows(IllegalArgumentException.class, () -> executor.invokeAny(List.of())); + } + } + + /** + * Test timed-invokeAny with empty collection. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyEmpty2(ExecutorService executor) throws Exception { + try (executor) { + assertThrows(IllegalArgumentException.class, + () -> executor.invokeAny(List.of(), 1, TimeUnit.MINUTES)); + } + } + + /** + * Test invokeAny with null. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyNull1(ExecutorService executor)throws Exception { + try (executor) { + assertThrows(NullPointerException.class, () -> executor.invokeAny(null)); + } + } + + /** + * Test invokeAny with null element + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAnyNull2(ExecutorService executor)throws Exception { + try (executor) { + List<Callable<String>> list = new ArrayList<>(); + list.add(() -> "foo"); + list.add(null); + assertThrows(NullPointerException.class, () -> executor.invokeAny(null)); + } + } + + /** + * Test invokeAll where all tasks complete normally. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAll1(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMillis(50)); + return "bar"; + }; + + List<Future<String>> futures = executor.invokeAll(List.of(task1, task2)); + assertTrue(futures.size() == 2); + + // check results + List<String> results = futures.stream().map(Future::resultNow).toList(); + assertEquals(results, List.of("foo", "bar")); + } + } + + /** + * Test invokeAll where all tasks complete with exception. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAll2(ExecutorService executor) throws Exception { + try (executor) { + class FooException extends Exception { } + class BarException extends Exception { } + Callable<String> task1 = () -> { throw new FooException(); }; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMillis(50)); + throw new BarException(); + }; + + List<Future<String>> futures = executor.invokeAll(List.of(task1, task2)); + assertTrue(futures.size() == 2); + + // check results + Throwable e1 = assertThrows(ExecutionException.class, () -> futures.get(0).get()); + assertTrue(e1.getCause() instanceof FooException); + Throwable e2 = assertThrows(ExecutionException.class, () -> futures.get(1).get()); + assertTrue(e2.getCause() instanceof BarException); + } + } + + /** + * Test invokeAll where all tasks complete normally before the timeout expires. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAll3(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMillis(50)); + return "bar"; + }; + + List<Future<String>> futures = executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES); + assertTrue(futures.size() == 2); + + // check results + List<String> results = futures.stream().map(Future::resultNow).toList(); + assertEquals(results, List.of("foo", "bar")); + } + } + + /** + * Test invokeAll where some tasks do not complete before the timeout expires. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAll4(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + List<Future<String>> futures = executor.invokeAll(List.of(task1, task2), 1, TimeUnit.SECONDS); + assertTrue(futures.size() == 2); + + // task1 should be done + assertTrue(futures.get(0).isDone()); + + // task2 should be cancelled and interrupted + assertTrue(futures.get(1).isCancelled()); + + // if task2 started then it should have been interrupted + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test invokeAll with interrupt status set. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllInterrupt1(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMinutes(1)); + return "bar"; + }; + + Thread.currentThread().interrupt(); + try { + executor.invokeAll(List.of(task1, task2)); + fail("invokeAll did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + } + } + + /** + * Test timed-invokeAll with interrupt status set. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllInterrupt3(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> { + Thread.sleep(Duration.ofMinutes(1)); + return "bar"; + }; + + Thread.currentThread().interrupt(); + try { + executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES); + fail("invokeAll did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + } + } + + /** + * Test interrupt with thread blocked in invokeAll. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllInterrupt4(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + scheduleInterruptAt("invokeAll"); + try { + executor.invokeAll(List.of(task1, task2)); + fail("invokeAll did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + + // if task2 started then it should have been interrupted + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test interrupt with thread blocked in timed-invokeAll. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllInterrupt6(ExecutorService executor) throws Exception { + try (executor) { + Callable<String> task1 = () -> "foo"; + + var task2Started = new AtomicBoolean(); + var task2Interrupted = new CountDownLatch(1); + Callable<String> task2 = () -> { + task2Started.set(true); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + task2Interrupted.countDown(); + } + return null; + }; + + scheduleInterruptAt("invokeAll"); + try { + executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES); + fail("invokeAll did not throw"); + } catch (InterruptedException expected) { + assertFalse(Thread.currentThread().isInterrupted()); + } finally { + Thread.interrupted(); // clear interrupt + } + + // if task2 started then it should have been interrupted + if (task2Started.get()) { + task2Interrupted.await(); + } + } + } + + /** + * Test invokeAll after ExecutorService has been shutdown. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllAfterShutdown1(ExecutorService executor) throws Exception { + executor.shutdown(); + + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> "bar"; + assertThrows(RejectedExecutionException.class, + () -> executor.invokeAll(List.of(task1, task2))); + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllAfterShutdown2(ExecutorService executor) throws Exception { + executor.shutdown(); + + Callable<String> task1 = () -> "foo"; + Callable<String> task2 = () -> "bar"; + assertThrows(RejectedExecutionException.class, + () -> executor.invokeAll(List.of(task1, task2), 1, TimeUnit.SECONDS)); + } + + /** + * Test invokeAll with empty collection. + */ + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllEmpty1(ExecutorService executor) throws Exception { + try (executor) { + List<Future<Object>> list = executor.invokeAll(List.of()); + assertTrue(list.size() == 0); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllEmpty2(ExecutorService executor) throws Exception { + try (executor) { + List<Future<Object>> list = executor.invokeAll(List.of(), 1, TimeUnit.SECONDS); + assertTrue(list.size() == 0); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllNull1(ExecutorService executor)throws Exception { + try (executor) { + assertThrows(NullPointerException.class, () -> executor.invokeAll(null)); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllNull2(ExecutorService executor)throws Exception { + try (executor) { + List<Callable<String>> tasks = new ArrayList<>(); + tasks.add(() -> "foo"); + tasks.add(null); + assertThrows(NullPointerException.class, () -> executor.invokeAll(tasks)); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllNull3(ExecutorService executor)throws Exception { + try (executor) { + assertThrows(NullPointerException.class, + () -> executor.invokeAll(null, 1, TimeUnit.SECONDS)); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllNull4(ExecutorService executor)throws Exception { + try (executor) { + Callable<String> task = () -> "foo"; + assertThrows(NullPointerException.class, + () -> executor.invokeAll(List.of(task), 1, null)); + } + } + + @ParameterizedTest + @MethodSource("executors") + void testInvokeAllNull5(ExecutorService executor)throws Exception { + try (executor) { + List<Callable<String>> tasks = new ArrayList<>(); + tasks.add(() -> "foo"); + tasks.add(null); + assertThrows(NullPointerException.class, + () -> executor.invokeAll(tasks, 1, TimeUnit.SECONDS)); + } + } + + /** + * Schedules the current thread to be interrupted when it waits (timed or untimed) + * at the given method name. + */ + private void scheduleInterruptAt(String methodName) { + Thread target = Thread.currentThread(); + scheduler.submit(() -> { + try { + boolean found = false; + while (!found) { + Thread.State state = target.getState(); + assertTrue(state != TERMINATED); + if ((state == WAITING || state == TIMED_WAITING) + && contains(target.getStackTrace(), methodName)) { + found = true; + } else { + Thread.sleep(20); + } + } + target.interrupt(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + + /** + * Returns true if the given stack trace contains an element for the given method name. + */ + private boolean contains(StackTraceElement[] stack, String methodName) { + return Arrays.stream(stack) + .anyMatch(e -> methodName.equals(e.getMethodName())); + } +} diff --git a/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java b/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java new file mode 100644 index 00000000000..026d86a6c85 --- /dev/null +++ b/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java @@ -0,0 +1,370 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @summary Test implementations of ExecutorService.submit/execute + * @run junit SubmitTest + */ + +import java.time.Duration; +import java.util.concurrent.*; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.jupiter.api.Assertions.*; + +class SubmitTest { + + private static Stream<ExecutorService> executors() { + return Stream.of( + Executors.newCachedThreadPool(), + Executors.newVirtualThreadPerTaskExecutor(), + new ForkJoinPool() + ); + } + + /** + * Test submit(Runnable) executes the task. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnable(ExecutorService executor) throws Exception { + try (executor) { + var latch = new CountDownLatch(1); + Future<?> future = executor.submit(latch::countDown); + latch.await(); + assertNull(future.get()); + } + } + + /** + * Test submit(Runnable) throws if executor is shutdown. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnableAfterShutdown(ExecutorService executor) { + executor.shutdown(); + assertThrows(RejectedExecutionException.class, () -> executor.submit(() -> { })); + } + + /** + * Test task submitted with submit(Runnable) is not interrupted by cancel(false). + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnableWithCancelFalse(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var stop = new CountDownLatch(1); + var done = new CountDownLatch(1); + Future<?> future = executor.submit(() -> { + started.countDown(); + try { + stop.await(); + } catch (InterruptedException e) { + // ignore + } finally { + done.countDown(); + } + }); + + // wait for task to start + started.await(); + + // cancel(false), task should not be interrupted + future.cancel(false); + assertFalse(done.await(500, TimeUnit.MILLISECONDS)); + + // let task finish + stop.countDown(); + } + } + + /** + * Test task submitted with submit(Runnable) is interrupted by cancel(true). + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnableWithCancelTrue(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var interrupted = new CountDownLatch(1); + Future<?> future = executor.submit(() -> { + started.countDown(); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + interrupted.countDown(); + } + }); + + // wait for task to start + started.await(); + + // cancel(true), task should be interrupted + future.cancel(true); + interrupted.await(); + } + } + + /** + * Test task submitted with submit(Runnable) is interrupted if executor is + * stopped with shutdownNow. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnableWithShutdownNow(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var interrupted = new CountDownLatch(1); + Future<?> future = executor.submit(() -> { + started.countDown(); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + interrupted.countDown(); + } + }); + + // wait for task to start + started.await(); + + // shutdown forcefully, task should be interrupted + executor.shutdownNow(); + interrupted.await(); + } + } + + /** + * Test submit(Runnable) throws if task is null. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitRunnableNull(ExecutorService executor) { + try (executor) { + Runnable nullTask = null; + assertThrows(NullPointerException.class, () -> executor.submit(nullTask)); + assertThrows(NullPointerException.class, () -> executor.submit(nullTask, Void.class)); + } + } + + // + + /** + * Test submit(Callable) executes the task. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallable(ExecutorService executor) throws Exception { + try (executor) { + var latch = new CountDownLatch(1); + Future<String> future = executor.submit(() -> { + latch.countDown(); + return "foo"; + }); + latch.await(); + assertEquals("foo", future.get()); + } + } + + /** + * Test submit(Callable) throws if executor is shutdown. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallableAfterShutdown(ExecutorService executor) { + executor.shutdown(); + assertThrows(RejectedExecutionException.class, () -> executor.submit(() -> null)); + } + + /** + * Test task submitted with submit(Callable) is not interrupted by cancel(false). + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallableWithCancelFalse(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var stop = new CountDownLatch(1); + var done = new CountDownLatch(1); + Future<Void> future = executor.submit(() -> { + started.countDown(); + try { + stop.await(); + } finally { + done.countDown(); + } + return null; + }); + + // wait for task to start + started.await(); + + // cancel(false), task should not be interrupted + future.cancel(false); + assertFalse(done.await(500, TimeUnit.MILLISECONDS)); + + // let task finish + stop.countDown(); + } + } + + /** + * Test task submitted with submit(Callable) is interrupted by cancel(true). + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallableWithCancelTrue(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var interrupted = new CountDownLatch(1); + Future<Void> future = executor.submit(() -> { + started.countDown(); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + interrupted.countDown(); + } + return null; + }); + + // wait for task to start + started.await(); + + // cancel(true), task should be interrupted + future.cancel(true); + interrupted.await(); + } + } + + /** + * Test task submitted with submit(Callable) is interrupted if executor is + * stopped with shutdownNow. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallableWithShutdownNow(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var interrupted = new CountDownLatch(1); + Future<Void> future = executor.submit(() -> { + started.countDown(); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + interrupted.countDown(); + } + return null; + }); + + // wait for task to start + started.await(); + + // shutdown forcefully, task should be interrupted + executor.shutdownNow(); + interrupted.await(); + } + } + + /** + * Test submit(Callable) throws if task is null. + */ + @ParameterizedTest + @MethodSource("executors") + void testSubmitCallableNull(ExecutorService executor) { + try (executor) { + Callable<Void> nullTask = null; + assertThrows(NullPointerException.class, () -> executor.submit(nullTask)); + } + } + + // + + /** + * Test execute(Runnable) executes the task. + */ + @ParameterizedTest + @MethodSource("executors") + void testExecute(ExecutorService executor) throws Exception { + try (executor) { + var latch = new CountDownLatch(1); + executor.execute(latch::countDown); + latch.await(); + } + } + + /** + * Test execute(Runnable) throws if executor is shutdown. + */ + @ParameterizedTest + @MethodSource("executors") + void testExecuteAfterShutdown(ExecutorService executor) { + executor.shutdown(); + assertThrows(RejectedExecutionException.class, () -> executor.execute(() -> { })); + } + + /** + * Test task submitted with execute(Runnable) is interrupted if executor is + * stopped with shutdownNow. + */ + @ParameterizedTest + @MethodSource("executors") + void testExecuteWithShutdownNow(ExecutorService executor) throws Exception { + try (executor) { + var started = new CountDownLatch(1); + var interrupted = new CountDownLatch(1); + executor.execute(() -> { + started.countDown(); + try { + Thread.sleep(Duration.ofDays(1)); + } catch (InterruptedException e) { + interrupted.countDown(); + } + }); + + // wait for task to start + started.await(); + + // shutdown forcefully, task should be interrupted + executor.shutdownNow(); + interrupted.await(); + } + } + + /** + * Test execute(Runnable) throws if task is null. + */ + @ParameterizedTest + @MethodSource("executors") + void testExecuteNull(ExecutorService executor) { + try (executor) { + Runnable nullTask = null; + assertThrows(NullPointerException.class, () -> executor.execute(nullTask)); + } + } +} diff --git a/test/jdk/java/util/concurrent/Future/DefaultMethods.java b/test/jdk/java/util/concurrent/Future/DefaultMethods.java index 2571c568043..dfeb5bde64e 100644 --- a/test/jdk/java/util/concurrent/Future/DefaultMethods.java +++ b/test/jdk/java/util/concurrent/Future/DefaultMethods.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,42 +25,50 @@ * @test * @summary Test Future's default methods * @library ../lib - * @run testng DefaultMethods + * @run junit DefaultMethods */ -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.stream.Stream; import static java.util.concurrent.Future.State.*; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; -import static org.testng.Assert.*; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.jupiter.api.Assertions.*; -public class DefaultMethods { +class DefaultMethods { - @DataProvider(name = "executors") - public Object[][] executors() { - return new Object[][] { - // ensures that default implementation is tested - { new DelegatingExecutorService(Executors.newCachedThreadPool()), }, + static Stream<ExecutorService> executors() { + return Stream.of( + // ensures that default close method is tested + new DelegatingExecutorService(Executors.newCachedThreadPool()), - // executors that may return a Future that overrides the methods - { new ForkJoinPool(), }, - { Executors.newCachedThreadPool(), } - }; + // executors that may return a Future that overrides the methods + Executors.newCachedThreadPool(), + Executors.newVirtualThreadPerTaskExecutor(), + new ForkJoinPool() + ); } /** * Test methods when the task has not completed. */ - @Test(dataProvider = "executors") - public void testRunningTask(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testRunningTask(ExecutorService executor) { try (executor) { var latch = new CountDownLatch(1); Future<?> future = executor.submit(() -> { latch.await(); return null; }); try { assertTrue(future.state() == RUNNING); - expectThrows(IllegalStateException.class, future::resultNow); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::exceptionNow); } finally { latch.countDown(); } @@ -70,41 +78,44 @@ public class DefaultMethods { /** * Test methods when the task has already completed with a result. */ - @Test(dataProvider = "executors") - public void testCompletedTask1(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testCompletedTask1(ExecutorService executor) { try (executor) { Future<String> future = executor.submit(() -> "foo"); awaitDone(future); assertTrue(future.state() == SUCCESS); - assertEquals(future.resultNow(), "foo"); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertEquals("foo", future.resultNow()); + assertThrows(IllegalStateException.class, future::exceptionNow); } } /** * Test methods when the task has already completed with null. */ - @Test(dataProvider = "executors") - public void testCompletedTask2(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testCompletedTask2(ExecutorService executor) { try (executor) { Future<String> future = executor.submit(() -> null); awaitDone(future); assertTrue(future.state() == SUCCESS); - assertEquals(future.resultNow(), null); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertNull(future.resultNow()); + assertThrows(IllegalStateException.class, future::exceptionNow); } } /** * Test methods when the task has completed with an exception. */ - @Test(dataProvider = "executors") - public void testFailedTask(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testFailedTask(ExecutorService executor) { try (executor) { Future<?> future = executor.submit(() -> { throw new ArithmeticException(); }); awaitDone(future); assertTrue(future.state() == FAILED); - expectThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::resultNow); Throwable ex = future.exceptionNow(); assertTrue(ex instanceof ArithmeticException); } @@ -113,16 +124,17 @@ public class DefaultMethods { /** * Test methods when the task has been cancelled (mayInterruptIfRunning=false) */ - @Test(dataProvider = "executors") - public void testCancelledTask1(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testCancelledTask1(ExecutorService executor) { try (executor) { var latch = new CountDownLatch(1); Future<?> future = executor.submit(() -> { latch.await(); return null; }); future.cancel(false); try { assertTrue(future.state() == CANCELLED); - expectThrows(IllegalStateException.class, future::resultNow); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::exceptionNow); } finally { latch.countDown(); } @@ -132,16 +144,17 @@ public class DefaultMethods { /** * Test methods when the task has been cancelled (mayInterruptIfRunning=true) */ - @Test(dataProvider = "executors") - public void testCancelledTask2(ExecutorService executor) { + @ParameterizedTest + @MethodSource("executors") + void testCancelledTask2(ExecutorService executor) { try (executor) { var latch = new CountDownLatch(1); Future<?> future = executor.submit(() -> { latch.await(); return null; }); future.cancel(true); try { assertTrue(future.state() == CANCELLED); - expectThrows(IllegalStateException.class, future::resultNow); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::exceptionNow); } finally { latch.countDown(); } @@ -152,46 +165,46 @@ public class DefaultMethods { * Test CompletableFuture with the task has not completed. */ @Test - public void testCompletableFuture1() { + void testCompletableFuture1() { var future = new CompletableFuture<String>(); assertTrue(future.state() == RUNNING); - expectThrows(IllegalStateException.class, future::resultNow); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::exceptionNow); } /** * Test CompletableFuture with the task that completed with result. */ @Test - public void testCompletableFuture2() { + void testCompletableFuture2() { var future = new CompletableFuture<String>(); future.complete("foo"); assertTrue(future.state() == SUCCESS); - assertEquals(future.resultNow(), "foo"); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertEquals("foo", future.resultNow()); + assertThrows(IllegalStateException.class, future::exceptionNow); } /** * Test CompletableFuture with the task that completed with null. */ @Test - public void testCompletableFuture3() { + void testCompletableFuture3() { var future = new CompletableFuture<String>(); future.complete(null); assertTrue(future.state() == SUCCESS); - assertEquals(future.resultNow(), null); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertNull(future.resultNow()); + assertThrows(IllegalStateException.class, future::exceptionNow); } /** * Test CompletableFuture with the task that completed with exception. */ @Test - public void testCompletableFuture4() { + void testCompletableFuture4() { var future = new CompletableFuture<String>(); future.completeExceptionally(new ArithmeticException()); assertTrue(future.state() == FAILED); - expectThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::resultNow); Throwable ex = future.exceptionNow(); assertTrue(ex instanceof ArithmeticException); } @@ -200,12 +213,12 @@ public class DefaultMethods { * Test CompletableFuture with the task that was cancelled. */ @Test - public void testCompletableFuture5() { + void testCompletableFuture5() { var future = new CompletableFuture<String>(); future.cancel(false); assertTrue(future.state() == CANCELLED); - expectThrows(IllegalStateException.class, future::resultNow); - expectThrows(IllegalStateException.class, future::exceptionNow); + assertThrows(IllegalStateException.class, future::resultNow); + assertThrows(IllegalStateException.class, future::exceptionNow); } /** diff --git a/test/jdk/java/util/concurrent/TEST.properties b/test/jdk/java/util/concurrent/TEST.properties new file mode 100644 index 00000000000..74a024f2880 --- /dev/null +++ b/test/jdk/java/util/concurrent/TEST.properties @@ -0,0 +1 @@ +maxOutputSize=2000000 diff --git a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java index 7f7618a2189..5af31673167 100644 --- a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java +++ b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -23,11 +23,11 @@ /* * @test - * @run testng AsyncShutdownNow - * @summary Test invoking shutdownNow with threads blocked in Future.get, - * invokeAll, and invokeAny + * @summary Test ForkJoinPool.shutdownNow with threads blocked in invokeXXX and Future.get + * @run junit AsyncShutdownNow */ +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -38,42 +38,39 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static java.lang.Thread.State.*; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; -import static org.testng.Assert.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import static org.junit.jupiter.api.Assertions.*; -public class AsyncShutdownNow { +class AsyncShutdownNow { // long running interruptible task private static final Callable<Void> SLEEP_FOR_A_DAY = () -> { - Thread.sleep(86400_000); + Thread.sleep(Duration.ofDays(1)); return null; }; - /** - * The executors to test. - */ - @DataProvider(name = "executors") - public Object[][] executors() { - return new Object[][] { - { new ForkJoinPool() }, - { new ForkJoinPool(1) }, - }; + static Stream<ForkJoinPool> pools() { + return Stream.of( + new ForkJoinPool(), + new ForkJoinPool(1) + ); } /** - * Test shutdownNow with running task and thread blocked in Future::get. + * Test shutdownNow with a running task and main thread blocked in Future::get. */ - @Test(dataProvider = "executors") - public void testFutureGet(ExecutorService executor) throws Exception { - System.out.format("testFutureGet: %s%n", executor); - try (executor) { - Future<?> future = executor.submit(SLEEP_FOR_A_DAY); + @ParameterizedTest + @MethodSource("pools") + void testFutureGet(ForkJoinPool pool) throws Exception { + try (pool) { + Future<?> future = pool.submit(SLEEP_FOR_A_DAY); - // shutdownNow when main thread waits in ForkJoinTask.get - onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow); + // shutdownNow when main thread waits in ForkJoinTask.awaitDone + onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow); try { future.get(); fail(); @@ -84,16 +81,16 @@ public class AsyncShutdownNow { } /** - * Test shutdownNow with running task and thread blocked in a timed Future::get. + * Test shutdownNow with a running task and main thread blocked in a timed Future::get. */ - @Test(dataProvider = "executors") - public void testTimedFutureGet(ExecutorService executor) throws Exception { - System.out.format("testTimedFutureGet: %s%n", executor); - try (executor) { - Future<?> future = executor.submit(SLEEP_FOR_A_DAY); + @ParameterizedTest + @MethodSource("pools") + void testTimedFutureGet(ForkJoinPool pool) throws Exception { + try (pool) { + Future<?> future = pool.submit(SLEEP_FOR_A_DAY); - // shutdownNow when main thread waits in ForkJoinTask.get - onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow); + // shutdownNow when main thread waits in ForkJoinTask.awaitDone + onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow); try { future.get(1, TimeUnit.HOURS); fail(); @@ -104,15 +101,15 @@ public class AsyncShutdownNow { } /** - * Test shutdownNow with thread blocked in invokeAll. + * Test shutdownNow with running tasks and main thread blocked in invokeAll. */ - @Test(dataProvider = "executors") - public void testInvokeAll(ExecutorService executor) throws Exception { - System.out.format("testInvokeAll: %s%n", executor); - try (executor) { - // shutdownNow when main thread waits in ForkJoinTask.quietlyJoin - onWait("java.util.concurrent.ForkJoinTask.quietlyJoin", executor::shutdownNow); - List<Future<Void>> futures = executor.invokeAll(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY)); + @ParameterizedTest + @MethodSource("pools") + void testInvokeAll(ForkJoinPool pool) throws Exception { + try (pool) { + // shutdownNow when main thread waits in ForkJoinTask.awaitDone + onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow); + List<Future<Void>> futures = pool.invokeAll(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY)); for (Future<Void> f : futures) { assertTrue(f.isDone()); try { @@ -126,16 +123,16 @@ public class AsyncShutdownNow { } /** - * Test shutdownNow with thread blocked in invokeAny. + * Test shutdownNow with running tasks and main thread blocked in invokeAny. */ - @Test(dataProvider = "executors", enabled = false) - public void testInvokeAny(ExecutorService executor) throws Exception { - System.out.format("testInvokeAny: %s%n", executor); - try (executor) { + @ParameterizedTest + @MethodSource("pools") + void testInvokeAny(ForkJoinPool pool) throws Exception { + try (pool) { // shutdownNow when main thread waits in ForkJoinTask.get - onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow); + onWait("java.util.concurrent.ForkJoinTask.get", pool::shutdownNow); try { - executor.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY)); + pool.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY)); fail(); } catch (ExecutionException e) { // expected diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java index 320f908cd3d..5b1dcfc2b34 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java @@ -249,14 +249,17 @@ public class ForkJoinPool19Test extends JSR166TestCase { FailingFibAction(int n) { number = n; } public void compute() { int n = number; - if (n <= 1) - throw new FJException(); - else { - FailingFibAction f1 = new FailingFibAction(n - 1); - FailingFibAction f2 = new FailingFibAction(n - 2); - invokeAll(f1, f2); - result = f1.result + f2.result; + if (n > 1) { + try { + FailingFibAction f1 = new FailingFibAction(n - 1); + FailingFibAction f2 = new FailingFibAction(n - 2); + invokeAll(f1, f2); + result = f1.result + f2.result; + return; + } catch (CancellationException fallthrough) { + } } + throw new FJException(); } } @@ -389,6 +392,7 @@ public class ForkJoinPool19Test extends JSR166TestCase { } f.quietlyJoin(); checkCancelled(f); + Thread.interrupted(); }}; checkInvoke(a); a.reinitialize(); @@ -504,36 +508,78 @@ public class ForkJoinPool19Test extends JSR166TestCase { * Implicitly closing a new pool using try-with-resources terminates it */ public void testClose() { - ForkJoinTask f = new FibAction(8); - ForkJoinPool pool = null; - try (ForkJoinPool p = new ForkJoinPool()) { - pool = p; - p.execute(f); - } - checkCompletedNormally(f); - assertTrue(pool != null && pool.isTerminated()); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + FibAction f = new FibAction(1); + ForkJoinPool pool = null; + try (ForkJoinPool p = new ForkJoinPool()) { + pool = p; + p.execute(f); + } + assertTrue(pool != null && pool.isTerminated()); + f.join(); + assertEquals(1, f.result); + }}); + awaitTermination(t); } /** - * Implicitly closing common pool using try-with-resources has no effect. + * Explicitly closing a new pool terminates it */ - public void testCloseCommonPool() { - ForkJoinTask f = new FibAction(8); - ForkJoinPool pool; - try (ForkJoinPool p = pool = ForkJoinPool.commonPool()) { - p.execute(f); - } + public void testClose2() { + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + ForkJoinPool pool = new ForkJoinPool(); + FibAction f = new FibAction(1); + pool.execute(f); + pool.close(); + assertTrue(pool.isTerminated()); + f.join(); + assertEquals(1, f.result); + }}); + awaitTermination(t); + } - assertFalse(pool.isShutdown()); - assertFalse(pool.isTerminating()); - assertFalse(pool.isTerminated()); + /** + * Explicitly closing a shutdown pool awaits termination + */ + public void testClose3() { + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + ForkJoinPool pool = new ForkJoinPool(); + FibAction f = new FibAction(1); + pool.execute(f); + pool.shutdown(); + pool.close(); + assertTrue(pool.isTerminated()); + f.join(); + assertEquals(1, f.result); + }}); + awaitTermination(t); + } + /** + * Implicitly closing common pool using try-with-resources has no effect. + */ + public void testCloseCommonPool() { String prop = System.getProperty( "java.util.concurrent.ForkJoinPool.common.parallelism"); - if (! "0".equals(prop)) { - f.join(); - checkCompletedNormally(f); - } + boolean nothreads = "0".equals(prop); + Thread t = newStartedThread(new CheckedRunnable() { + public void realRun() throws InterruptedException { + ForkJoinTask f = new FibAction(8); + ForkJoinPool pool; + try (ForkJoinPool p = pool = ForkJoinPool.commonPool()) { + p.execute(f); + } + assertFalse(pool.isShutdown()); + assertFalse(pool.isTerminating()); + assertFalse(pool.isTerminated()); + if (!nothreads) { + f.join(); + checkCompletedNormally(f); + } + }}); + awaitTermination(t); } - } diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java index 9be6ce3d418..2bc5021fcef 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java @@ -232,14 +232,17 @@ public class ForkJoinPool8Test extends JSR166TestCase { FailingFibAction(int n) { number = n; } public void compute() { int n = number; - if (n <= 1) - throw new FJException(); - else { - FailingFibAction f1 = new FailingFibAction(n - 1); - FailingFibAction f2 = new FailingFibAction(n - 2); - invokeAll(f1, f2); - result = f1.result + f2.result; + if (n > 1) { + try { + FailingFibAction f1 = new FailingFibAction(n - 1); + FailingFibAction f2 = new FailingFibAction(n - 2); + invokeAll(f1, f2); + result = f1.result + f2.result; + return; + } catch (CancellationException fallthrough) { + } } + throw new FJException(); } } @@ -402,7 +405,9 @@ public class ForkJoinPool8Test extends JSR166TestCase { try { f.get(randomTimeout(), null); shouldThrow(); - } catch (NullPointerException success) {} + } catch (NullPointerException success) { + f.join(); + } }}; checkInvoke(a); } diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java index 781b7bffe6d..7db3e20a2fb 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java @@ -627,6 +627,22 @@ public class ForkJoinPoolTest extends JSR166TestCase { } /** + * invoke throws a RuntimeException if task throws unchecked exception + */ + public void testInvokeUncheckedException() throws Throwable { + ForkJoinPool p = new ForkJoinPool(1); + try (PoolCleaner cleaner = cleaner(p)) { + try { + p.invoke(ForkJoinTask.adapt(new Callable<Object>() { + public Object call() { throw new ArithmeticException(); }})); + shouldThrow(); + } catch (RuntimeException success) { + assertTrue(success.getCause() instanceof ArithmeticException); + } + } + } + + /** * invokeAny(null) throws NullPointerException */ public void testInvokeAny1() throws Throwable { diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java index 4eaa9d2a3db..24989cb152b 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java @@ -496,7 +496,9 @@ public class ForkJoinTaskTest extends JSR166TestCase { try { f.get(randomTimeout(), null); shouldThrow(); - } catch (NullPointerException success) {} + } catch (NullPointerException success) { + f.join(); + } }}; testInvokeOnPool(mainPool(), a); } @@ -1245,7 +1247,9 @@ public class ForkJoinTaskTest extends JSR166TestCase { try { f.get(randomTimeout(), null); shouldThrow(); - } catch (NullPointerException success) {} + } catch (NullPointerException success) { + f.join(); + } }}; testInvokeOnPool(singletonPool(), a); } diff --git a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java index 1f5a6e09683..76ab189e553 100644 --- a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java +++ b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java @@ -129,6 +129,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; @@ -1669,11 +1670,20 @@ public class JSR166TestCase extends TestCase { checkTimedGet(f, expectedValue, LONG_DELAY_MS); } + // Avoids unwanted interrupts when run inder jtreg + static ThreadGroup topThreadGroup() { + for (ThreadGroup g = Thread.currentThread().getThreadGroup(), p; ; g = p) + if ((p = g.getParent()) == null) + return g; + } + static final ThreadGroup jsr166TestThreadGroup = + new ThreadGroup(topThreadGroup(), "jsr1666TestThreadGroup"); + /** * Returns a new started daemon Thread running the given runnable. */ Thread newStartedThread(Runnable runnable) { - Thread t = new Thread(runnable); + Thread t = new Thread(jsr166TestThreadGroup, runnable); t.setDaemon(true); t.start(); return t; @@ -1693,10 +1703,12 @@ public class JSR166TestCase extends TestCase { * the thread (in the hope that it may terminate later) and fails. */ void awaitTermination(Thread thread, long timeoutMillis) { - try { - thread.join(timeoutMillis); - } catch (InterruptedException fail) { - threadUnexpectedException(fail); + for (;;) { // ignore stray interrupts by test harness + try { + thread.join(timeoutMillis); + break; + } catch (InterruptedException ignore) { + } } if (thread.getState() != Thread.State.TERMINATED) { String detail = String.format( @@ -1940,6 +1952,8 @@ public class JSR166TestCase extends TestCase { @Override protected final void compute() { try { realCompute(); + } catch (CancellationException ex) { + throw ex; // expected by some tests } catch (Throwable fail) { threadUnexpectedException(fail); } @@ -1955,6 +1969,8 @@ public class JSR166TestCase extends TestCase { @Override protected final T compute() { try { return realCompute(); + } catch (CancellationException ex) { + throw ex; } catch (Throwable fail) { threadUnexpectedException(fail); } diff --git a/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java b/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java index 64115eec1c9..69bae253dd1 100644 --- a/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java +++ b/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java @@ -162,10 +162,10 @@ public class RecursiveActionTest extends JSR166TestCase { void checkCompletedAbnormally(RecursiveAction a, Throwable t) { assertTrue(a.isDone()); - assertFalse(a.isCancelled()); assertFalse(a.isCompletedNormally()); assertTrue(a.isCompletedAbnormally()); - assertSame(t.getClass(), a.getException().getClass()); + if (!a.isCancelled()) + assertSame(t.getClass(), a.getException().getClass()); assertNull(a.getRawResult()); assertFalse(a.cancel(false)); assertFalse(a.cancel(true)); @@ -222,14 +222,17 @@ public class RecursiveActionTest extends JSR166TestCase { FailingFibAction(int n) { number = n; } public void compute() { int n = number; - if (n <= 1) - throw new FJException(); - else { - FailingFibAction f1 = new FailingFibAction(n - 1); - FailingFibAction f2 = new FailingFibAction(n - 2); - invokeAll(f1, f2); - result = f1.result + f2.result; + if (n > 1) { + try { + FailingFibAction f1 = new FailingFibAction(n - 1); + FailingFibAction f2 = new FailingFibAction(n - 2); + invokeAll(f1, f2); + result = f1.result + f2.result; + return; + } catch (CancellationException fallthrough) { + } } + throw new FJException(); } } @@ -488,7 +491,9 @@ public class RecursiveActionTest extends JSR166TestCase { try { f.get(randomTimeout(), null); shouldThrow(); - } catch (NullPointerException success) {} + } catch (NullPointerException success) { + f.join(); + } }}; testInvokeOnPool(mainPool(), a); } diff --git a/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java b/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java index 66745f4a199..348fef78a6f 100644 --- a/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java +++ b/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java @@ -178,10 +178,10 @@ public class RecursiveTaskTest extends JSR166TestCase { void checkCompletedAbnormally(RecursiveTask<?> a, Throwable t) { assertTrue(a.isDone()); - assertFalse(a.isCancelled()); assertFalse(a.isCompletedNormally()); assertTrue(a.isCompletedAbnormally()); - assertSame(t.getClass(), a.getException().getClass()); + if (!a.isCancelled()) + assertSame(t.getClass(), a.getException().getClass()); assertNull(a.getRawResult()); assertFalse(a.cancel(false)); assertFalse(a.cancel(true)); @@ -240,11 +240,15 @@ public class RecursiveTaskTest extends JSR166TestCase { FailingFibTask(int n) { number = n; } public Integer compute() { int n = number; - if (n <= 1) - throw new FJException(); - FailingFibTask f1 = new FailingFibTask(n - 1); - f1.fork(); - return new FibTask(n - 2).compute() + f1.join(); + if (n > 1) { + try { + FailingFibTask f1 = new FailingFibTask(n - 1); + f1.fork(); + return new FibTask(n - 2).compute() + f1.join(); + } catch (CancellationException fallthrough) { + } + } + throw new FJException(); } } diff --git a/test/jdk/java/util/concurrent/tck/tck.policy b/test/jdk/java/util/concurrent/tck/tck.policy index 66f79ab3e39..62018f792a3 100644 --- a/test/jdk/java/util/concurrent/tck/tck.policy +++ b/test/jdk/java/util/concurrent/tck/tck.policy @@ -1,6 +1,7 @@ grant { // Permissions j.u.c. needs directly permission java.lang.RuntimePermission "modifyThread"; + permission java.lang.RuntimePermission "modifyThreadGroup"; permission java.lang.RuntimePermission "getClassLoader"; permission java.lang.RuntimePermission "setContextClassLoader"; permission java.util.PropertyPermission "*", "read"; |