aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaxim Kartashev <maxim.kartashev@jetbrains.com>2024-01-10 12:19:12 +0400
committerMaxim Kartashev <maxim.kartashev@jetbrains.com>2024-01-24 18:32:40 +0400
commit3a6456e1f447c2f02361469a8f140ef5ec0e63f5 (patch)
tree8f68f2b434bbbe1aa6ef313de98f3dbb991dc637
parent283beedc05820aa3a6fef31303f6318920d452aa (diff)
downloadJetBrainsRuntime-3a6456e1f447c2f02361469a8f140ef5ec0e63f5.tar.gz
8288899: java/util/concurrent/ExecutorService/CloseTest.java failed with "InterruptedException: sleep interrupted"
Reviewed-by: alanb
-rw-r--r--src/java.base/share/classes/java/util/concurrent/CountedCompleter.java21
-rw-r--r--src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java3019
-rw-r--r--src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java1130
-rw-r--r--src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java5
-rw-r--r--test/jdk/ProblemList.txt5
-rw-r--r--test/jdk/java/util/concurrent/ExecutorService/CloseTest.java177
-rw-r--r--test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java787
-rw-r--r--test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java370
-rw-r--r--test/jdk/java/util/concurrent/Future/DefaultMethods.java119
-rw-r--r--test/jdk/java/util/concurrent/TEST.properties1
-rw-r--r--test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java95
-rw-r--r--test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java106
-rw-r--r--test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java21
-rw-r--r--test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java16
-rw-r--r--test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java8
-rw-r--r--test/jdk/java/util/concurrent/tck/JSR166TestCase.java26
-rw-r--r--test/jdk/java/util/concurrent/tck/RecursiveActionTest.java25
-rw-r--r--test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java18
-rw-r--r--test/jdk/java/util/concurrent/tck/tck.policy1
19 files changed, 3883 insertions, 2067 deletions
diff --git a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
index 3278d191074..7884a131ffb 100644
--- a/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
+++ b/src/java.base/share/classes/java/util/concurrent/CountedCompleter.java
@@ -524,6 +524,10 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
return pending;
}
+ final void initPending(int count) {
+ U.putInt(this, PENDING, count);
+ }
+
/**
* Sets the pending count to the given value.
*
@@ -724,26 +728,27 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* processed.
*/
public final void helpComplete(int maxTasks) {
- ForkJoinPool.WorkQueue q; Thread t; boolean owned;
- if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ForkJoinPool.WorkQueue q; Thread t; boolean internal;
+ if (internal =
+ (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonQueue();
if (q != null && maxTasks > 0)
- q.helpComplete(this, owned, maxTasks);
+ q.helpComplete(this, internal, maxTasks);
}
+
// ForkJoinTask overrides
/**
* Supports ForkJoinTask exception propagation.
*/
@Override
- final int trySetException(Throwable ex) {
+ final void onAuxExceptionSet(Throwable ex) {
CountedCompleter<?> a = this, p = a;
- do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
- a.onExceptionalCompletion(ex, p) &&
- (a = (p = a).completer) != null && a.status >= 0);
- return status;
+ do {} while (a.onExceptionalCompletion(ex, p) &&
+ (a = (p = a).completer) != null &&
+ a.trySetThrown(ex));
}
/**
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 5e698b1540f..121f5cefbd6 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -47,11 +47,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.function.Predicate;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
@@ -187,27 +186,39 @@ public class ForkJoinPool extends AbstractExecutorService {
* Implementation Overview
*
* This class and its nested classes provide the main
- * functionality and control for a set of worker threads:
- * Submissions from non-FJ threads enter into submission queues.
- * Workers take these tasks and typically split them into subtasks
- * that may be stolen by other workers. Work-stealing based on
- * randomized scans generally leads to better throughput than
- * "work dealing" in which producers assign tasks to idle threads,
- * in part because threads that have finished other tasks before
- * the signalled thread wakes up (which can be a long time) can
- * take the task instead. Preference rules give first priority to
- * processing tasks from their own queues (LIFO or FIFO, depending
- * on mode), then to randomized FIFO steals of tasks in other
- * queues. This framework began as vehicle for supporting
- * tree-structured parallelism using work-stealing. Over time,
- * its scalability advantages led to extensions and changes to
- * better support more diverse usage contexts. Because most
- * internal methods and nested classes are interrelated, their
- * main rationale and descriptions are presented here; individual
- * methods and nested classes contain only brief comments about
- * details. There are a fair number of odd code constructions and
- * design decisions for components that reside at the edge of Java
- * vs JVM functionality.
+ * functionality and control for a set of worker threads. Because
+ * most internal methods and nested classes are interrelated,
+ * their main rationale and descriptions are presented here;
+ * individual methods and nested classes contain only brief
+ * comments about details. Broadly: submissions from non-FJ
+ * threads enter into submission queues. Workers take these tasks
+ * and typically split them into subtasks that may be stolen by
+ * other workers. Work-stealing based on randomized scans
+ * generally leads to better throughput than "work dealing" in
+ * which producers assign tasks to idle threads, in part because
+ * threads that have finished other tasks before the signalled
+ * thread wakes up (which can be a long time) can take the task
+ * instead. Preference rules give first priority to processing
+ * tasks from their own queues (LIFO or FIFO, depending on mode),
+ * then to randomized FIFO steals of tasks in other queues. This
+ * framework began as vehicle for supporting tree-structured
+ * parallelism using work-stealing. Over time, its scalability
+ * advantages led to extensions and changes to better support more
+ * diverse usage contexts. Here's a brief history of major
+ * revisions, each also with other minor features and changes.
+ *
+ * 1. Only handle recursively structured computational tasks
+ * 2. Async (FIFO) mode and striped submission queues
+ * 3. Completion-based tasks (mainly CountedCompleters)
+ * 4. CommonPool and parallelStream support
+ * 5. InterruptibleTasks for externally submitted tasks
+ *
+ * Most changes involve adaptions of base algorithms using
+ * combinations of static and dynamic bitwise mode settings (both
+ * here and in ForkJoinTask), and subclassing of ForkJoinTask.
+ * There are a fair number of odd code constructions and design
+ * decisions for components that reside at the edge of Java vs JVM
+ * functionality.
*
* WorkQueues
* ==========
@@ -238,16 +249,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* Nardelli, PPoPP 2013
* (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
* analysis of memory ordering requirements in work-stealing
- * algorithms similar to the one used here. We also use ordered,
- * moded accesses and/or fences for other control, with modes
- * reflecting the presence or absence of other contextual sync
- * provided by atomic and/or volatile accesses. Some methods (or
- * their primary loops) begin with an acquire fence or
- * otherwise-unnecessary volatile read that amounts to an
- * acquiring read of "this" to cover all fields (which is
- * sometimes stronger than necessary, but less brittle). Some
- * constructions are intentionally racy because they use read
- * values as hints, not for correctness.
+ * algorithms similar to the one used here. We use per-operation
+ * ordered writes of various kinds for updates, but usually use
+ * explicit load fences for reads, to cover access of several
+ * fields of possibly several objects without further constraining
+ * read-by-read ordering.
*
* We also support a user mode in which local task processing is
* in FIFO, not LIFO order, simply by using a local version of
@@ -256,7 +262,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* increased contention among task producers and consumers. Also,
* the same data structure (and class) is used for "submission
* queues" (described below) holding externally submitted tasks,
- * that differ only in that a lock (field "access"; see below) is
+ * that differ only in that a lock (using field "phase"; see below) is
* required by external callers to push and pop tasks.
*
* Adding tasks then takes the form of a classic array push(task)
@@ -267,8 +273,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* uses masking, not mod, for indexing a power-of-two-sized array,
* enforces memory ordering, supports resizing, and possibly
* signals waiting workers to start scanning (described below),
- * which requires even internal usages to strictly order accesses
- * (using a form of lock release).
+ * which requires stronger forms of order accesses.
*
* The pop operation (always performed by owner) is of the form:
* if ((task = getAndSet(q.array, (q.top-1) % length, null)) != null)
@@ -289,7 +294,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* * Slot k must be read with an acquiring read, which it must
* anyway to dereference and run the task if the (acquiring)
* CAS succeeds, but uses an explicit acquire fence to support
- * the following rechecks even if the CAS is not attempted.
+ * the following rechecks even if the CAS is not attempted. To
+ * more easily distinguish among kinds of CAS failures, we use
+ * the compareAndExchange version, and usually handle null
+ * returns (indicating contention) separately from others.
*
* * q.base may change between reading and using its value to
* index the slot. To avoid trying to use the wrong t, the
@@ -310,8 +318,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* q.base doesn't change on repeated reads of null t and when
* no other alternatives apply, spin-wait for it to settle. To
* reduce producing these kinds of stalls by other stealers, we
- * encourage timely writes to indices using store fences when
- * memory ordering is not already constrained by context.
+ * encourage timely writes to indices using otherwise
+ * unnecessarily strong writes.
*
* * The CAS may fail, in which case we may want to retry unless
* there is too much contention. One goal is to balance and
@@ -329,12 +337,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* when deciding whether to try to poll or repoll after a
* failure. Both top and base may move independently, and both
* lag updates to the underlying array. To reduce memory
- * contention, when possible, non-owners avoid reading the
- * "top" index at all, and instead use array reads, including
- * one-ahead reads to check whether to repoll, relying on the
- * fact that a non-empty queue does not have two null slots in
- * a row, except in cases (resizes and shifts) that can be
- * detected with a secondary recheck.
+ * contention, non-owners avoid reading the "top" when
+ * possible, by using one-ahead reads to check whether to
+ * repoll, relying on the fact that a non-empty queue does not
+ * have two null slots in a row, except in cases (resizes and
+ * shifts) that can be detected with a secondary recheck that
+ * is less likely to conflict with owner writes.
*
* The poll operations in q.poll(), scan(), helpJoin(), and
* elsewhere differ with respect to whether other queues are
@@ -362,14 +370,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* like workers except that they are restricted to executing local
* tasks that they submitted (or when known, subtasks thereof).
* Insertion of tasks in shared mode requires a lock. We use only
- * a simple spinlock because submitters encountering a busy queue
- * move to a different position to use or create other queues.
- * They (spin) block when registering new queues, and less
- * often in tryRemove and helpComplete. The lock needed for
- * external queues is generalized (as field "access") for
- * operations on owned queues that require a fully-fenced write
- * (including push, parking status, and termination) in order to
- * deal with Dekker-like signalling constructs described below.
+ * a simple spinlock (as one role of field "phase") because
+ * submitters encountering a busy queue move to a different
+ * position to use or create other queues. They (spin) block when
+ * registering new queues, or indirectly elsewhere, by revisiting
+ * later.
*
* Management
* ==========
@@ -395,29 +400,58 @@ public class ForkJoinPool extends AbstractExecutorService {
* restrict maximum parallelism to (1<<15)-1 (which is far in
* excess of normal operating range) to allow ids, counts, and
* their negations (used for thresholding) to fit into 16bit
- * subfields. Field "parallelism" holds the target parallelism
- * (normally corresponding to pool size). It is needed (nearly)
- * only in methods updating ctl, so is packed nearby. As of the
- * current release, users can dynamically reset target
- * parallelism, which is read once per update, so only slowly has
- * an effect in creating threads or letting them time out and
- * terminate when idle.
- *
- * Field "runState" holds lifetime status, atomically and
- * monotonically setting SHUTDOWN, STOP, and finally TERMINATED
- * bits. It is updated only via bitwise atomics (getAndBitwiseOr).
+ * subfields.
+ *
+ * Field "runState" and per-WorkQueue field "phase" play similar
+ * roles, as lockable, versioned counters. Field runState also
+ * includes monotonic event bits (SHUTDOWN, STOP, and TERMINATED).
+ * The version tags enable detection of state changes (by
+ * comparing two reads) modulo bit wraparound. The bit range in
+ * each case suffices for purposes of determining quiescence,
+ * termination, avoiding ABA-like errors, and signal control, most
+ * of which are ultimately based on at most 15bit ranges (due to
+ * 32767 max total workers). RunState updates do not need to be
+ * atomic with respect to ctl updates, but because they are not,
+ * some care is required to avoid stalls. The seqLock properties
+ * detect changes and conditionally upgrade to coordinate with
+ * updates. It is typically held for less than a dozen
+ * instructions unless the queue array is being resized, during
+ * which contention is rare. To be conservative, lockRunState is
+ * implemented as a spin/sleep loop. Here and elsewhere spin
+ * constants are short enough to apply even on systems with few
+ * available processors. In addition to checking pool status,
+ * reads of runState sometimes serve as acquire fences before
+ * reading other fields.
+ *
+ * Field "parallelism" holds the target parallelism (normally
+ * corresponding to pool size). Users can dynamically reset target
+ * parallelism, but is only accessed when signalling or awaiting
+ * work, so only slowly has an effect in creating threads or
+ * letting them time out and terminate when idle.
*
* Array "queues" holds references to WorkQueues. It is updated
* (only during worker creation and termination) under the
- * registrationLock, but is otherwise concurrently readable (often
- * prefaced by a volatile read of mode to check termination, that
- * is required anyway, and serves as an acquire fence). To
- * simplify index-based operations, the array size is always a
- * power of two, and all readers must tolerate null slots. Worker
- * queues are at odd indices. Worker ids masked with SMASK match
- * their index. Shared (submission) queues are at even
- * indices. Grouping them together in this way simplifies and
- * speeds up task scanning.
+ * runState lock. It is otherwise concurrently readable but reads
+ * for use in scans (see below) are always prefaced by a volatile
+ * read of runState (or equivalent constructions), ensuring that
+ * its state is current at the point it is used (which is all we
+ * require). To simplify index-based operations, the array size is
+ * always a power of two, and all readers must tolerate null
+ * slots. Worker queues are at odd indices. Worker phase ids
+ * masked with SMASK match their index. Shared (submission) queues
+ * are at even indices. Grouping them together in this way aids in
+ * task scanning: At top-level, both kinds of queues should be
+ * sampled with approximately the same probability, which is
+ * simpler if they are all in the same array. But we also need to
+ * identify what kind they are without looking at them, leading to
+ * this odd/even scheme. One disadvantage is that there are
+ * usually many fewer submission queues, so there can be many
+ * wasted probes (null slots). But this is still cheaper than
+ * alternatives. Other loops over the queues array vary in origin
+ * and stride depending on whether they cover only submission
+ * (even) or worker (odd) queues or both, and whether they require
+ * randomness (in which case cyclically exhaustive strides may be
+ * used).
*
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
@@ -429,7 +463,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* one source of some of the messy code constructions here). In
* essence, the queues array serves as a weak reference
* mechanism. In particular, the stack top subfield of ctl stores
- * indices, not references.
+ * indices, not references. Operations on queues obtained from
+ * these indices remain valid (with at most some unnecessary extra
+ * work) even if an underlying worker failed and was replaced by
+ * another at the same index. During termination, worker queue
+ * array updates are disabled.
*
* Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
* cannot let workers spin indefinitely scanning for tasks when
@@ -438,28 +476,30 @@ public class ForkJoinPool extends AbstractExecutorService {
* other hand, we must quickly prod them into action when new
* tasks are submitted or generated. These latencies are mainly a
* function of JVM park/unpark (and underlying OS) performance,
- * which can be slow and variable. In many usages, ramp-up time
+ * which can be slow and variable (even though usages are
+ * streamlined as much as possible). In many usages, ramp-up time
* is the main limiting factor in overall performance, which is
* compounded at program start-up by JIT compilation and
* allocation. On the other hand, throughput degrades when too
- * many threads poll for too few tasks.
+ * many threads poll for too few tasks. (See below.)
*
* The "ctl" field atomically maintains total and "released"
* worker counts, plus the head of the available worker queue
* (actually stack, represented by the lower 32bit subfield of
* ctl). Released workers are those known to be scanning for
- * and/or running tasks. Unreleased ("available") workers are
- * recorded in the ctl stack. These workers are made eligible for
- * signalling by enqueuing in ctl (see method awaitWork). The
- * "queue" is a form of Treiber stack. This is ideal for
- * activating threads in most-recently used order, and improves
- * performance and locality, outweighing the disadvantages of
- * being prone to contention and inability to release a worker
- * unless it is topmost on stack. The top stack state holds the
- * value of the "phase" field of the worker: its index and status,
- * plus a version counter that, in addition to the count subfields
- * (also serving as version stamps) provide protection against
- * Treiber stack ABA effects.
+ * and/or running tasks (we cannot accurately determine
+ * which). Unreleased ("available") workers are recorded in the
+ * ctl stack. These workers are made eligible for signalling by
+ * enqueuing in ctl (see method runWorker). This "queue" is a
+ * form of Treiber stack. This is ideal for activating threads in
+ * most-recently used order, and improves performance and
+ * locality, outweighing the disadvantages of being prone to
+ * contention and inability to release a worker unless it is
+ * topmost on stack. The top stack state holds the value of the
+ * "phase" field of the worker: its index and status, plus a
+ * version counter that, in addition to the count subfields (also
+ * serving as version stamps) provide protection against Treiber
+ * stack ABA effects.
*
* Creating workers. To create a worker, we pre-increment counts
* (serving as a reservation), and attempt to construct a
@@ -473,16 +513,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* workers. If exceptional, the exception is propagated, generally
* to some external caller.
*
- * WorkQueue field "phase" is used by both workers and the pool to
- * manage and track whether a worker is unsignalled (possibly
- * blocked waiting for a signal), conveniently using the sign bit
- * to check. When a worker is enqueued its phase field is set
- * negative. Note that phase field updates lag queue CAS releases;
- * seeing a negative phase does not guarantee that the worker is
- * available (and so is never checked in this way). When queued,
- * the lower 16 bits of its phase must hold its pool index. So we
- * place the index there upon initialization and never modify
- * these bits.
+ * WorkQueue field "phase" encodes the queue array id in lower
+ * bits, and otherwise acts similarly to the pool runState field:
+ * The "IDLE" bit is clear while active (either a released worker
+ * or a locked external queue), with other bits serving as a
+ * version counter to distinguish changes across multiple reads.
+ * Note that phase field updates lag queue CAS releases; seeing a
+ * non-idle phase does not guarantee that the worker is available
+ * (and so is never checked in this way).
*
* The ctl field also serves as the basis for memory
* synchronization surrounding activation. This uses a more
@@ -498,178 +536,230 @@ public class ForkJoinPool extends AbstractExecutorService {
* workers to scan for tasks. Method signalWork and its callers
* try to approximate the unattainable goal of having the right
* number of workers activated for the tasks at hand, but must err
- * on the side of too many workers vs too few to avoid stalls. If
- * computations are purely tree structured, it suffices for every
- * worker to activate another when it pushes a task into an empty
- * queue, resulting in O(log(#threads)) steps to full activation.
- * (To reduce resource usages in some cases, at the expense of
- * slower startup in others, activation of an idle thread is
- * preferred over creating a new one, here and elsewhere.) If
- * instead, tasks come in serially from only a single producer,
- * each worker taking its first (since the last activation) task
- * from a queue should signal another if there are more tasks in
- * that queue. This is equivalent to, but generally faster than,
- * arranging the stealer take two tasks, re-pushing one on its own
- * queue, and signalling (because its queue is empty), also
- * resulting in logarithmic full activation time. Because we don't
- * know about usage patterns (or most commonly, mixtures), we use
- * both approaches. Together these are minimally necessary for
- * maintaining liveness. However, they do not account for the fact
- * that when tasks are short-lived, signals are unnecessary
- * because workers will already be scanning for new tasks without
- * the need of new signals. We track these cases (variable
- * "prevSrc" in scan() and related methods) to avoid some
- * unnecessary signals and scans. However, signal contention and
- * overhead effects may still occur during ramp-up, ramp-down, and
- * small computations involving only a few workers.
+ * on the side of too many workers vs too few to avoid stalls:
+ *
+ * * If computations are purely tree structured, it suffices for
+ * every worker to activate another when it pushes a task into
+ * an empty queue, resulting in O(log(#threads)) steps to full
+ * activation. Emptiness must be conservatively approximated,
+ * sometimes resulting in unnecessary signals. Also, to reduce
+ * resource usages in some cases, at the expense of slower
+ * startup in others, activation of an idle thread is preferred
+ * over creating a new one, here and elsewhere.
+ *
+ * * If instead, tasks come in serially from only a single
+ * producer, each worker taking its first (since the last
+ * activation) task from a queue should propagate a signal if
+ * there are more tasks in that queue. This is equivalent to,
+ * but generally faster than, arranging the stealer take
+ * multiple tasks, re-pushing one or more on its own queue, and
+ * signalling (because its queue is empty), also resulting in
+ * logarithmic full activation time
+ *
+ * * Because we don't know about usage patterns (or most commonly,
+ * mixtures), we use both approaches, which present even more
+ * opportunities to over-signal. Note that in either of these
+ * contexts, signals may be (and often are) unnecessary because
+ * active workers continue scanning after running tasks without
+ * the need to be signalled (which is one reason work stealing
+ * is often faster than alternatives), so additional workers
+ * aren't needed. But there is no efficient way to detect this.
+ *
+ * * For rapidly branching tasks that require full pool resources,
+ * oversignalling is OK, because signalWork will soon have no
+ * more workers to create or reactivate. But for others (mainly
+ * externally submitted tasks), overprovisioning may cause very
+ * noticeable slowdowns due to contention and resource
+ * wastage. All remedies are intrinsically heuristic. We use a
+ * strategy that works well in most cases: We track "sources"
+ * (queue ids) of non-empty (usually polled) queues while
+ * scanning. These are maintained in the "source" field of
+ * WorkQueues for use in method helpJoin and elsewhere (see
+ * below). We also maintain them as arguments/results of
+ * top-level polls (argument "window" in method scan, with setup
+ * in method runWorker) as an encoded sliding window of current
+ * and previous two sources (or INVALID_ID if none), and stop
+ * signalling when all were from the same source. Also, retries
+ * are suppressed on CAS failures by newly activated workers,
+ * which serves as a form of admission control. These
+ * mechanisms may result in transiently too few workers, but
+ * once workers poll from a new source, they rapidly reactivate
+ * others.
+ *
+ * * Despite these, signal contention and overhead effects still
+ * occur during ramp-up and ramp-down of small computations.
*
* Scanning. Method scan performs top-level scanning for (and
* execution of) tasks by polling a pseudo-random permutation of
- * the array (by starting at a random index, and using a constant
- * cyclically exhaustive stride.) It uses the same basic polling
+ * the array (by starting at a given index, and using a constant
+ * cyclically exhaustive stride.) It uses the same basic polling
* method as WorkQueue.poll(), but restarts with a different
- * permutation on each invocation. (Non-top-level scans; for
- * example in helpJoin, use simpler and faster linear probes
- * because they do not systematically contend with top-level
- * scans.) The pseudorandom generator need not have high-quality
- * statistical properties in the long term. We use Marsaglia
- * XorShifts, seeded with the Weyl sequence from ThreadLocalRandom
- * probes, which are cheap and suffice. Scans do not otherwise
- * explicitly take into account core affinities, loads, cache
- * localities, etc, However, they do exploit temporal locality
- * (which usually approximates these) by preferring to re-poll
- * from the same queue (using method tryPoll()) after a successful
- * poll before trying others (see method topLevelExec), which also
- * reduces bookkeeping and scanning overhead. This also reduces
+ * permutation on each invocation. The pseudorandom generator
+ * need not have high-quality statistical properties in the long
+ * term. We use Marsaglia XorShifts, seeded with the Weyl sequence
+ * from ThreadLocalRandom probes, which are cheap and
+ * suffice. Scans do not otherwise explicitly take into account
+ * core affinities, loads, cache localities, etc, However, they do
+ * exploit temporal locality (which usually approximates these) by
+ * preferring to re-poll from the same queue (either in method
+ * tryPoll() or scan) after a successful poll before trying others
+ * (see method topLevelExec), which also reduces bookkeeping,
+ * cache traffic, and scanning overhead. But it also reduces
* fairness, which is partially counteracted by giving up on
* contention.
*
* Deactivation. When method scan indicates that no tasks are
- * found by a worker, it deactivates (see awaitWork). Note that
- * not finding tasks doesn't mean that there won't soon be
+ * found by a worker, it tries to deactivate (in runWorker). Note
+ * that not finding tasks doesn't mean that there won't soon be
* some. Further, a scan may give up under contention, returning
* even without knowing whether any tasks are still present, which
- * is OK, given the above signalling rules that will eventually
- * maintain progress. Blocking and unblocking via park/unpark can
- * cause serious slowdowns when tasks are rapidly but irregularly
- * generated (which is often due to garbage collectors and other
- * activities). One way to ameliorate is for workers to rescan
- * multiple times, even when there are unlikely to be tasks. But
- * this causes enough memory and CAS contention to prefer using
- * quieter spinwaits in awaitWork; currently set to small values
- * that only cover near-miss scenarios for deactivate vs activate
- * races. Because idle workers are often not yet blocked (via
- * LockSupport.park), we use the WorkQueue access field to
- * advertise that a waiter actually needs unparking upon signal.
- *
- * When idle workers are not continually woken up, the count
- * fields in ctl allow efficient and accurate discovery of
- * quiescent states (i.e., when all workers are idle) after
- * deactivation. However, this voting mechanism alone does not
- * guarantee that a pool can become dormant (quiesced or
- * terminated), because external racing producers do not vote, and
- * can asynchronously submit new tasks. To deal with this, the
- * final unparked thread (in awaitWork) scans external queues to
- * check for tasks that could have been added during a race window
- * that would not be accompanied by a signal, in which case
- * re-activating itself (or any other worker) to recheck. The same
- * sets of checks are used in tryTerminate, to correctly trigger
- * delayed termination (shutDown, followed by quiescence) in the
- * presence of racing submissions. In all cases, the notion of the
- * "final" unparked thread is an approximation, because new
- * workers could be in the process of being constructed, which
- * occasionally adds some extra unnecessary processing.
- *
- * Shutdown and Termination. A call to shutdownNow invokes
- * tryTerminate to atomically set a mode bit. The calling thread,
- * as well as every other worker thereafter terminating, helps
- * terminate others by cancelling their unprocessed tasks, and
- * interrupting other workers. Calls to non-abrupt shutdown()
- * preface this by checking isQuiescent before triggering the
- * "STOP" phase of termination. During termination, workers are
- * stopped using all three of (often in parallel): releasing via
- * ctl (method reactivate), interrupts, and cancelling tasks that
- * will cause workers to not find work and exit. To support this,
- * worker references not removed from the queues array during
- * termination. It is possible for late thread creations to still
- * be in progress after a quiescent termination reports terminated
- * status, but they will also immediately terminate. To conform to
- * ExecutorService invoke, invokeAll, and invokeAny specs, we must
- * track pool status while waiting in ForkJoinTask.awaitDone, and
- * interrupt interruptible callers on termination, while also
- * avoiding cancelling other tasks that are normally completing
- * during quiescent termination. This is tracked by recording
- * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
- * argument to joining methods.
+ * is OK given, a secondary check (in awaitWork) needed to cover
+ * deactivation/signal races. Blocking and unblocking via
+ * park/unpark can cause serious slowdowns when tasks are rapidly
+ * but irregularly generated (which is often due to garbage
+ * collectors and other activities). One way to ameliorate is for
+ * workers to rescan multiple times, even when there are unlikely
+ * to be tasks. But this causes enough memory traffic and CAS
+ * contention to prefer using quieter short spinwaits in awaitWork
+ * and elsewhere. Those in awaitWork are set to small values that
+ * only cover near-miss scenarios for inactivate/activate races.
+ * Because idle workers are often not yet blocked (parked), we use
+ * the WorkQueue parker field to advertise that a waiter actually
+ * needs unparking upon signal.
+ *
+ * Quiescence. Workers scan looking for work, giving up when they
+ * don't find any, without being sure that none are available.
+ * However, some required functionality relies on consensus about
+ * quiescence (also termination, discussed below). The count
+ * fields in ctl allow accurate discovery of states in which all
+ * workers are idle. However, because external (asynchronous)
+ * submitters are not part of this vote, these mechanisms
+ * themselves do not guarantee that the pool is in a quiescent
+ * state with respect to methods isQuiescent, shutdown (which
+ * begins termination when quiescent), helpQuiesce, and indirectly
+ * others including tryCompensate. Method quiescent() is
+ * used in all of these contexts. It provides checks that all
+ * workers are idle and there are no submissions that they could
+ * poll if they were not idle, retrying on inconsistent reads of
+ * queues and using the runState seqLock to retry on queue array
+ * updates. (It also reports quiescence if the pool is
+ * terminating.) A true report means only that there was a moment
+ * at which quiescence held. False negatives are inevitable (for
+ * example when queues indices lag updates, as described above),
+ * which is accommodated when (tentatively) idle by scanning for
+ * work etc, and then re-invoking. This includes cases in which
+ * the final unparked thread (in awaitWork) uses quiescent()
+ * to check for tasks that could have been added during a race
+ * window that would not be accompanied by a signal, in which case
+ * re-activating itself (or any other worker) to rescan. Method
+ * helpQuiesce acts similarly but cannot rely on ctl counts to
+ * determine that all workers are inactive because the caller and
+ * any others executing helpQuiesce are not included in counts.
+ *
+ * Termination. A call to shutdownNow invokes tryTerminate to
+ * atomically set a runState mode bit. However, the process of
+ * termination is intrinsically non-atomic. The calling thread, as
+ * well as other workers thereafter terminating help cancel queued
+ * tasks and interrupt other workers. These actions race with
+ * unterminated workers. By default, workers check for
+ * termination only when accessing pool state. This may take a
+ * while but suffices for structured computational tasks. But not
+ * necessarily for others. Class InterruptibleTask (see below)
+ * further arranges runState checks before executing task bodies,
+ * and ensures interrupts while terminating. Even so, there are no
+ * guarantees after an abrupt shutdown that remaining tasks
+ * complete normally or exceptionally or are cancelled.
+ * Termination may fail to complete if running tasks ignore both
+ * task status and interrupts and/or produce more tasks after
+ * others that could cancel them have exited.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
* time out and terminate if the pool has remained quiescent for
- * period given by field keepAlive.
+ * period given by field keepAlive (default 60sec), which applies
+ * to the first timeout of a fully populated pool. Subsequent (or
+ * other) cases use delays such that, if still quiescent, all will
+ * be released before one additional keepAlive unit elapses.
*
* Joining Tasks
* =============
*
- * Normally, the first option when joining a task that is not done
- * is to try to take it from local queue and run it. Otherwise,
- * any of several actions may be taken when one worker is waiting
- * to join a task stolen (or always held) by another. Because we
- * are multiplexing many tasks on to a pool of workers, we can't
- * always just let them block (as in Thread.join). We also cannot
- * just reassign the joiner's run-time stack with another and
- * replace it later, which would be a form of "continuation", that
- * even if possible is not necessarily a good idea since we may
- * need both an unblocked task and its continuation to progress.
- * Instead we combine two tactics:
- *
- * Helping: Arranging for the joiner to execute some task that it
- * could be running if the steal had not occurred.
- *
- * Compensating: Unless there are already enough live threads,
- * method tryCompensate() may create or re-activate a spare
- * thread to compensate for blocked joiners until they unblock.
- *
- * A third form (implemented via tryRemove) amounts to helping a
- * hypothetical compensator: If we can readily tell that a
- * possible action of a compensator is to steal and execute the
- * task being joined, the joining thread can do so directly,
- * without the need for a compensation thread; although with a
- * possibility of reduced parallelism because of a transient gap
- * in the queue array that stalls stealers.
- *
- * Other intermediate forms available for specific task types (for
- * example helpAsyncBlocker) often avoid or postpone the need for
- * blocking or compensation.
- *
- * The ManagedBlocker extension API can't use helping so relies
- * only on compensation in method awaitBlocker.
+ * The "Join" part of ForkJoinPools consists of a set of
+ * mechanisms that sometimes or always (depending on the kind of
+ * task) avoid context switching or adding worker threads when one
+ * task would otherwise be blocked waiting for completion of
+ * another, basically, just by running that task or one of its
+ * subtasks if not already taken. These mechanics are disabled for
+ * InterruptibleTasks, that guarantee that callers do not executed
+ * submitted tasks.
+ *
+ * The basic structure of joining is an extended spin/block scheme
+ * in which workers check for task completions status between
+ * steps to find other work, until relevant pool state stabilizes
+ * enough to believe that no such tasks are available, at which
+ * point blocking. This is usually a good choice of when to block
+ * that would otherwise be harder to approximate.
+ *
+ * These forms of helping may increase stack space usage, but that
+ * space is bounded in tree/dag structured procedurally parallel
+ * designs to be no more than that if a task were executed only by
+ * the joining thread. This is arranged by associated task
+ * subclasses that also help detect and control the ways in which
+ * this may occur.
*
- * The algorithm in helpJoin entails a form of "linear helping".
- * Each worker records (in field "source") a reference to the
- * queue from which it last stole a task. The scan in method
+ * Normally, the first option when joining a task that is not done
+ * is to try to take it from the local queue and run it. Method
+ * tryRemoveAndExec tries to do so. For tasks with any form of
+ * subtasks that must be completed first, we try to locate these
+ * subtasks and run them as well. This is easy when local, but
+ * when stolen, steal-backs are restricted to the same rules as
+ * stealing (polling), which requires additional bookkeeping and
+ * scanning. This cost is still very much worthwhile because of
+ * its impact on task scheduling and resource control.
+ *
+ * The two methods for finding and executing subtasks vary in
+ * details. The algorithm in helpJoin entails a form of "linear
+ * helping". Each worker records (in field "source") the index of
+ * the internal queue from which it last stole a task. (Note:
+ * because chains cannot include even-numbered external queues,
+ * they are ignored, and 0 is an OK default.) The scan in method
* helpJoin uses these markers to try to find a worker to help
- * (i.e., steal back a task from and execute it) that could hasten
- * completion of the actively joined task. Thus, the joiner
- * executes a task that would be on its own local deque if the
- * to-be-joined task had not been stolen. This is a conservative
- * variant of the approach described in Wagner & Calder
- * "Leapfrogging: a portable technique for implementing efficient
- * futures" SIGPLAN Notices, 1993
+ * (i.e., steal back a task from and execute it) that could make
+ * progress toward completion of the actively joined task. Thus,
+ * the joiner executes a task that would be on its own local deque
+ * if the to-be-joined task had not been stolen. This is a
+ * conservative variant of the approach described in Wagner &
+ * Calder "Leapfrogging: a portable technique for implementing
+ * efficient futures" SIGPLAN Notices, 1993
* (http://portal.acm.org/citation.cfm?id=155354). It differs
* mainly in that we only record queues, not full dependency
- * links. This requires a linear scan of the queues array to
- * locate stealers, but isolates cost to when it is needed, rather
- * than adding to per-task overhead. For CountedCompleters, the
+ * links. This requires a linear scan of the queues to locate
+ * stealers, but isolates cost to when it is needed, rather than
+ * adding to per-task overhead. For CountedCompleters, the
* analogous method helpComplete doesn't need stealer-tracking,
- * but requires a similar check of completion chains.
+ * but requires a similar (but simpler) check of completion
+ * chains.
*
* In either case, searches can fail to locate stealers when
- * stalls delay recording sources. We avoid some of these cases by
- * using snapshotted values of ctl as a check that the numbers of
- * workers are not changing. But even when accurately identified,
- * stealers might not ever produce a task that the joiner can in
- * turn help with. So, compensation is tried upon failure to find
- * tasks to run.
+ * stalls delay recording sources or issuing subtasks. We avoid
+ * some of these cases by using snapshotted values of ctl as a
+ * check that the numbers of workers are not changing, along with
+ * rescans to deal with contention and stalls. But even when
+ * accurately identified, stealers might not ever produce a task
+ * that the joiner can in turn help with.
+ *
+ * Related method helpAsyncBlocker does not directly rely on
+ * subtask structure, but instead avoids or postpones blocking of
+ * tagged tasks (CompletableFuture.AsynchronousCompletionTask) by
+ * executing other asyncs that can be processed in any order.
+ * This is currently invoked only in non-join-based blocking
+ * contexts from classes CompletableFuture and
+ * SubmissionPublisher, that could be further generalized.
+ *
+ * When any of the above fail to avoid blocking, we rely on
+ * "compensation" -- an indirect form of context switching that
+ * either activates an existing worker to take the place of the
+ * blocked one, or expands the number of workers.
*
* Compensation does not by default aim to keep exactly the target
* parallelism number of unblocked threads running at any given
@@ -677,12 +767,28 @@ public class ForkJoinPool extends AbstractExecutorService {
* compensations for any blocked join. However, in practice, the
* vast majority of blockages are transient byproducts of GC and
* other JVM or OS activities that are made worse by replacement
- * when they cause longer-term oversubscription. Rather than
- * impose arbitrary policies, we allow users to override the
- * default of only adding threads upon apparent starvation. The
- * compensation mechanism may also be bounded. Bounds for the
- * commonPool better enable JVMs to cope with programming errors
- * and abuse before running out of resources to do so.
+ * by causing longer-term oversubscription. These are inevitable
+ * without (unobtainably) perfect information about whether worker
+ * creation is actually necessary. False alarms are common enough
+ * to negatively impact performance, so compensation is by default
+ * attempted only when it appears possible that the pool could
+ * stall due to lack of any unblocked workers. However, we allow
+ * users to override defaults using the long form of the
+ * ForkJoinPool constructor. The compensation mechanism may also
+ * be bounded. Bounds for the commonPool better enable JVMs to
+ * cope with programming errors and abuse before running out of
+ * resources to do so.
+ *
+ * The ManagedBlocker extension API can't use helping so relies
+ * only on compensation in method awaitBlocker. This API was
+ * designed to highlight the uncertainty of compensation decisions
+ * by requiring implementation of method isReleasable to abort
+ * compensation during attempts to obtain a stable snapshot. But
+ * users now rely upon the fact that if isReleasable always
+ * returns false, the API can be used to obtain precautionary
+ * compensation, which is sometimes the only reasonable option
+ * when running unknown code in tasks; which is now supported more
+ * simply (see method beginCompensatedBlock).
*
* Common Pool
* ===========
@@ -691,32 +797,31 @@ public class ForkJoinPool extends AbstractExecutorService {
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, although with
- * some System property parsing and with security processing that
- * takes far longer than the actual construction when
- * SecurityManagers are used or properties are set. The common
- * pool is distinguished internally by having both a null
- * workerNamePrefix and ISCOMMON config bit set, along with
- * PRESET_SIZE set if parallelism was configured by system
- * property.
- *
- * When external threads use ForkJoinTask.fork for the common
- * pool, they can perform subtask processing (see helpComplete and
- * related methods) upon joins. This caller-helps policy makes it
- * sensible to set common pool parallelism level to one (or more)
- * less than the total number of available cores, or even zero for
- * pure caller-runs. For the sake of ExecutorService specs, we can
- * only do this for tasks entered via fork, not submit. We track
- * this using a task status bit (markPoolSubmission). In all
- * other cases, external threads waiting for joins first check the
- * common pool for their task, which fails quickly if the caller
- * did not fork to common pool.
+ * some System property parsing and security processing that takes
+ * far longer than the actual construction when SecurityManagers
+ * are used or properties are set. The common pool is
+ * distinguished by having a null workerNamePrefix (which is an
+ * odd convention, but avoids the need to decode status in factory
+ * classes). It also has PRESET_SIZE config set if parallelism
+ * was configured by system property.
+ *
+ * When external threads use the common pool, they can perform
+ * subtask processing (see helpComplete and related methods) upon
+ * joins, unless they are submitted using ExecutorService
+ * submission methods, which implicitly disallow this. This
+ * caller-helps policy makes it sensible to set common pool
+ * parallelism level to one (or more) less than the total number
+ * of available cores, or even zero for pure caller-runs. External
+ * threads waiting for joins first check the common pool for their
+ * task, which fails quickly if the caller did not fork to common
+ * pool.
*
* Guarantees for common pool parallelism zero are limited to
* tasks that are joined by their callers in a tree-structured
* fashion or use CountedCompleters (as is true for jdk
* parallelStreams). Support infiltrates several methods,
- * including those that retry helping steps or spin until we are
- * sure that none apply if there are no workers.
+ * including those that retry helping steps until we are sure that
+ * none apply if there are no workers.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
@@ -727,55 +832,82 @@ public class ForkJoinPool extends AbstractExecutorService {
* may be JVM-dependent and must access particular Thread class
* fields to achieve this effect.
*
- * Interrupt handling
- * ==================
- *
- * The framework is designed to manage task cancellation
- * (ForkJoinTask.cancel) independently from the interrupt status
- * of threads running tasks. (See the public ForkJoinTask
- * documentation for rationale.) Interrupts are issued only in
- * tryTerminate, when workers should be terminating and tasks
- * should be cancelled anyway. Interrupts are cleared only when
- * necessary to ensure that calls to LockSupport.park do not loop
- * indefinitely (park returns immediately if the current thread is
- * interrupted). For cases in which task bodies are specified or
- * desired to interrupt upon cancellation, ForkJoinTask.cancel can
- * be overridden to do so (as is done for invoke{Any,All}).
+ * InterruptibleTasks
+ * ====================
+ *
+ * Regular ForkJoinTasks manage task cancellation (method cancel)
+ * independently from the interrupt status of threads running
+ * tasks. Interrupts are issued internally only while
+ * terminating, to wake up workers and cancel queued tasks. By
+ * default, interrupts are cleared only when necessary to ensure
+ * that calls to LockSupport.park do not loop indefinitely (park
+ * returns immediately if the current thread is interrupted).
+ *
+ * To comply with ExecutorService specs, we use subclasses of
+ * abstract class InterruptibleTask for tasks that require
+ * stronger interruption and cancellation guarantees. External
+ * submitters never run these tasks, even if in the common pool.
+ * InterruptibleTasks include a "runner" field (implemented
+ * similarly to FutureTask) to support cancel(true). Upon pool
+ * shutdown, runners are interrupted so they can cancel. Since
+ * external joining callers never run these tasks, they must await
+ * cancellation by others, which can occur along several different
+ * paths.
+ *
+ * Across these APIs, rules for reporting exceptions for tasks
+ * with results accessed via join() differ from those via get(),
+ * which differ from those invoked using pool submit methods by
+ * non-workers (which comply with Future.get() specs). Internal
+ * usages of ForkJoinTasks ignore interrupt status when executing
+ * or awaiting completion. Otherwise, reporting task results or
+ * exceptions is preferred to throwing InterruptedExecptions,
+ * which are in turn preferred to timeouts. Similarly, completion
+ * status is preferred to reporting cancellation. Cancellation is
+ * reported as an unchecked exception by join(), and by worker
+ * calls to get(), but is otherwise wrapped in a (checked)
+ * ExecutionException.
+ *
+ * Worker Threads cannot be VirtualThreads, as enforced by
+ * requiring ForkJoinWorkerThreads in factories. There are
+ * several constructions relying on this. However as of this
+ * writing, virtual thread bodies are by default run as some form
+ * of InterruptibleTask.
*
* Memory placement
* ================
*
* Performance is very sensitive to placement of instances of
- * ForkJoinPool and WorkQueues and their queue arrays, as well the
- * placement of their fields. Caches misses and contention due to
- * false-sharing have been observed to slow down some programs by
- * more than a factor of four. There is no perfect solution, in
- * part because isolating more fields also generates more cache
- * misses in more common cases (because some fields snd slots are
- * usually read at the same time), and the main means of placing
- * memory, the @Contended annotation provides only rough control
- * (for good reason). We isolate the ForkJoinPool.ctl field as
- * well the set of WorkQueue fields that otherwise cause the most
- * false-sharing misses with respect to other fields. Also,
- * ForkJoinPool fields are ordered such that fields less prone to
- * contention effects are first, offsetting those that otherwise
- * would be, while also reducing total footprint vs using
- * multiple @Contended regions, which tends to slow down
- * less-contended applications. These arrangements mainly reduce
- * cache traffic by scanners, which speeds up finding tasks to
- * run. Initial sizing and resizing of WorkQueue arrays is an
- * even more delicate tradeoff because the best strategy may vary
- * across garbage collectors. Small arrays are better for locality
- * and reduce GC scan time, but large arrays reduce both direct
- * false-sharing and indirect cases due to GC bookkeeping
+ * ForkJoinPool and WorkQueues and their queue arrays, as well as
+ * the placement of their fields. Caches misses and contention due
+ * to false-sharing have been observed to slow down some programs
+ * by more than a factor of four. Effects may vary across initial
+ * memory configuarations, applications, and different garbage
+ * collectors and GC settings, so there is no perfect solution.
+ * Too much isolation may generate more cache misses in common
+ * cases (because some fields snd slots are usually read at the
+ * same time). The @Contended annotation provides only rough
+ * control (for good reason). Similarly for relying on fields
+ * being placed in size-sorted declaration order.
+ *
+ * For class ForkJoinPool, it is usually more effective to order
+ * fields such that the most commonly accessed fields are unlikely
+ * to share cache lines with adjacent objects under JVM layout
+ * rules. For class WorkQueue, an embedded @Contended region
+ * segregates fields most heavily updated by owners from those
+ * most commonly read by stealers or other management. Initial
+ * sizing and resizing of WorkQueue arrays is an even more
+ * delicate tradeoff because the best strategy systematically
+ * varies across garbage collectors. Small arrays are better for
+ * locality and reduce GC scan time, but large arrays reduce both
+ * direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
- * not especially fast because they require atomic transfers, and
- * may cause other scanning workers to stall or give up.
+ * not especially fast because they require atomic transfers.
* Currently, arrays are initialized to be fairly small but early
* resizes rapidly increase size by more than a factor of two
* until very large. (Maintenance note: any changes in fields,
- * queues, or their uses must be accompanied by re-evaluation of
- * these placement and sizing decisions.)
+ * queues, or their uses, or JVM layout policies, must be
+ * accompanied by re-evaluation of these placement and sizing
+ * decisions.)
*
* Style notes
* ===========
@@ -787,7 +919,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* other jdk components that require early parallelism. This can
* be awkward and ugly, but also reflects the need to control
* outcomes across the unusual cases that arise in very racy code
- * with very few invariants. All fields are read into locals
+ * with very few invariants. All atomic task slot updates use
+ * Unsafe operations requiring offset positions, not indices, as
+ * computed by method slotOffset. All fields are read into locals
* before use, and null-checked if they are references, even if
* they can never be null under current usages. Usually,
* computations (held in local variables) are defined as soon as
@@ -818,7 +952,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* perform reasonably even when interpreted (not compiled).
*
* The order of declarations in this file is (with a few exceptions):
- * (1) Static constants
+ * (1) Static configuration constants
* (2) Static utility functions
* (3) Nested (static) classes
* (4) Fields, along with constants used when unpacking some of them
@@ -832,15 +966,19 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* The main sources of differences from previous version are:
*
- * * Use of Unsafe vs VarHandle, including re-instatement of some
- * constructions from pre-VarHandle versions.
- * * Reduced memory and signal contention, mainly by distinguishing
- * failure cases.
- * * Improved initialization, in part by preparing for possible
- * removal of SecurityManager
- * * Enable resizing (includes refactoring quiescence/termination)
- * * Unification of most internal vs external operations; some made
- * possible via use of WorkQueue.access, and POOLSUBMIT status in tasks.
+ * * New abstract class ForkJoinTask.InterruptibleTask ensures
+ * handling of tasks submitted under the ExecutorService
+ * API are consistent with specs.
+ * * Method quiescent() replaces previous quiescence-related
+ * checks, relying on versioning and sequence locking instead
+ * of ReentrantLock.
+ * * Termination processing now ensures that internal data
+ * structures are maintained consistently enough while stopping
+ * to interrupt all workers and cancel all tasks. It also uses a
+ * CountDownLatch instead of a Condition for termination because
+ * of lock change.
+ * * Many other changes to avoid performance regressions due
+ * to the above.
*/
// static configuration constants
@@ -860,7 +998,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* The default value for common pool maxSpares. Overridable using
* the "java.util.concurrent.ForkJoinPool.common.maximumSpares"
* system property. The default value is far in excess of normal
- * requirements, but also far short of MAX_CAP and typical OS
+ * requirements, but also far short of maximum capacity and typical OS
* thread limits, so allows JVMs to catch misuse/abuse before
* running out of resources needed to do so.
*/
@@ -872,26 +1010,42 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
- // Bounds
- static final int SWIDTH = 16; // width of short
- static final int SMASK = 0xffff; // short bits == max index
- static final int MAX_CAP = 0x7fff; // max #workers - 1
-
- // pool.runState and workQueue.access bits and sentinels
- static final int STOP = 1 << 31; // must be negative
- static final int SHUTDOWN = 1;
- static final int TERMINATED = 2;
- static final int PARKED = -1; // access value when parked
-
- // {pool, workQueue}.config bits
- static final int FIFO = 1 << 16; // fifo queue or access mode
- static final int SRC = 1 << 17; // set when stealable
- static final int CLEAR_TLS = 1 << 18; // set for Innocuous workers
- static final int TRIMMED = 1 << 19; // timed out while idle
- static final int ISCOMMON = 1 << 20; // set for common pool
- static final int PRESET_SIZE = 1 << 21; // size was set by property
-
- static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
+ // conversions among short, int, long
+ static final int SMASK = 0xffff; // (unsigned) short bits
+ static final long LMASK = 0xffffffffL; // lower 32 bits of long
+ static final long UMASK = ~LMASK; // upper 32 bits
+
+ // masks and sentinels for queue indices
+ static final int MAX_CAP = 0x7fff; // max # workers
+ static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
+ static final int INVALID_ID = 0x4000; // unused external queue id
+
+ // pool.runState bits
+ static final int STOP = 1 << 0; // terminating
+ static final int SHUTDOWN = 1 << 1; // terminate when quiescent
+ static final int TERMINATED = 1 << 2; // only set if STOP also set
+ static final int RS_LOCK = 1 << 3; // lowest seqlock bit
+
+ // spin/sleep limits for runState locking and elsewhere
+ static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
+ static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
+ static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos
+
+ // {pool, workQueue} config bits
+ static final int FIFO = 1 << 0; // fifo queue or access mode
+ static final int CLEAR_TLS = 1 << 1; // set for Innocuous workers
+ static final int PRESET_SIZE = 1 << 2; // size was set by property
+
+ // source history window packing used in scan() and runWorker()
+ static final long RESCAN = 1L << 63; // must be negative
+ static final long WMASK = ~(((long)SMASK) << 48); // id bits only
+ static final long NO_HISTORY = ((((long)INVALID_ID) << 32) | // no 3rd
+ (((long)INVALID_ID) << 16)); // no 2nd
+
+ // others
+ static final int DEREGISTERED = 1 << 31; // worker terminating
+ static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
+ static final int IDLE = 1 << 16; // phase seqlock/version count
/*
* Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
@@ -911,9 +1065,6 @@ public class ForkJoinPool extends AbstractExecutorService {
* Other updates of multiple subfields require CAS.
*/
- // Lower and upper word masks
- static final long SP_MASK = 0xffffffffL;
- static final long UC_MASK = ~SP_MASK;
// Release counts
static final int RC_SHIFT = 48;
static final long RC_UNIT = 0x0001L << RC_SHIFT;
@@ -922,13 +1073,27 @@ public class ForkJoinPool extends AbstractExecutorService {
static final int TC_SHIFT = 32;
static final long TC_UNIT = 0x0001L << TC_SHIFT;
static final long TC_MASK = 0xffffL << TC_SHIFT;
- // sp bits
- static final int SS_SEQ = 1 << 16; // version count
- static final int INACTIVE = 1 << 31; // phase bit when idle
+
+ /*
+ * All atomic operations on task arrays (queues) use Unsafe
+ * operations that take array offsets versus indices, based on
+ * array base and shift constants established during static
+ * initialization.
+ */
+ static final long ABASE;
+ static final int ASHIFT;
// Static utilities
/**
+ * Returns the array offset corresponding to the given index for
+ * Unsafe task queue operations
+ */
+ static long slotOffset(int index) {
+ return ((long)index << ASHIFT) + ABASE;
+ }
+
+ /**
* If there is a security manager, makes sure caller has
* permission to modify threads.
*/
@@ -1046,72 +1211,80 @@ public class ForkJoinPool extends AbstractExecutorService {
* submission. See above for descriptions and algorithms.
*/
static final class WorkQueue {
- int stackPred; // pool stack (ctl) predecessor link
- int config; // index, mode, ORed with SRC after init
- int base; // index of next slot for poll
+ // fields declared in order of their likely layout on most VMs
+ final ForkJoinWorkerThread owner; // null if shared
+ volatile Thread parker; // set when parking in awaitWork
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
- final ForkJoinWorkerThread owner; // owning thread or null if shared
+ int base; // index of next slot for poll
+ final int config; // mode bits
// fields otherwise causing more unnecessary false-sharing cache misses
@jdk.internal.vm.annotation.Contended("w")
int top; // index of next slot for push
@jdk.internal.vm.annotation.Contended("w")
- volatile int access; // values 0, 1 (locked), PARKED, STOP
+ volatile int phase; // versioned active status
@jdk.internal.vm.annotation.Contended("w")
- volatile int phase; // versioned, negative if inactive
+ int stackPred; // pool stack (ctl) predecessor link
@jdk.internal.vm.annotation.Contended("w")
- volatile int source; // source queue id in topLevelExec
+ volatile int source; // source queue id (or DEREGISTERED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
// Support for atomic operations
private static final Unsafe U;
- private static final long ACCESS;
private static final long PHASE;
- private static final long ABASE;
- private static final int ASHIFT;
+ private static final long BASE;
+ private static final long TOP;
+ private static final long SOURCE;
+ private static final long ARRAY;
- static ForkJoinTask<?> getAndClearSlot(ForkJoinTask<?>[] a, int i) {
- return (ForkJoinTask<?>)
- U.getAndSetReference(a, ((long)i << ASHIFT) + ABASE, null);
+ final void updateBase(int v) {
+ U.putIntVolatile(this, BASE, v);
+ }
+ final void updateTop(int v) {
+ U.putIntOpaque(this, TOP, v);
}
- static boolean casSlotToNull(ForkJoinTask<?>[] a, int i,
- ForkJoinTask<?> c) {
- return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE,
- c, null);
+ final void forgetSource() {
+ U.putIntOpaque(this, SOURCE, 0);
}
- final void forcePhaseActive() { // clear sign bit
- U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
+ final void updateArray(ForkJoinTask<?>[] a) {
+ U.getAndSetReference(this, ARRAY, a);
}
- final int getAndSetAccess(int v) {
- return U.getAndSetInt(this, ACCESS, v);
+ final void unlockPhase() {
+ U.getAndAddInt(this, PHASE, IDLE);
}
- final void releaseAccess() {
- U.putIntRelease(this, ACCESS, 0);
+ final boolean tryLockPhase() { // seqlock acquire
+ int p;
+ return (((p = phase) & IDLE) != 0 &&
+ U.compareAndSetInt(this, PHASE, p, p + IDLE));
}
/**
- * Constructor. For owned queues, most fields are initialized
+ * Constructor. For internal queues, most fields are initialized
* upon thread start in pool.registerWorker.
*/
- WorkQueue(ForkJoinWorkerThread owner, int config) {
+ WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
+ boolean clearThreadLocals) {
+ if (clearThreadLocals)
+ cfg |= CLEAR_TLS;
+ this.config = cfg;
+ top = base = 1;
+ this.phase = id;
this.owner = owner;
- this.config = config;
- base = top = 1;
}
/**
* Returns an exportable index (used by ForkJoinWorkerThread).
*/
final int getPoolIndex() {
- return (config & 0xffff) >>> 1; // ignore odd/even tag bit
+ return (phase & 0xffff) >>> 1; // ignore odd/even tag bit
}
/**
* Returns the approximate number of tasks in the queue.
*/
final int queueSize() {
- int unused = access; // for ordering effect
+ int unused = phase; // for ordering effect
return Math.max(top - base, 0); // ignore transient negative
}
@@ -1119,42 +1292,50 @@ public class ForkJoinPool extends AbstractExecutorService {
* Pushes a task. Called only by owner or if already locked
*
* @param task the task. Caller must ensure non-null.
- * @param pool the pool. Must be non-null unless terminating.
- * @param signalIfEmpty true if signal when pushing to empty queue
- * @throws RejectedExecutionException if array cannot be resized
+ * @param pool the pool to signal if was previously empty, else null
+ * @param internal if caller owns this queue
+ * @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool,
- boolean signalIfEmpty) {
- boolean resize = false;
- int s = top++, b = base, cap, m; ForkJoinTask<?>[] a;
- if ((a = array) != null && (cap = a.length) > 0) {
- if ((m = (cap - 1)) == s - b) {
- resize = true; // rapidly grow until large
- int newCap = (cap < 1 << 24) ? cap << 2 : cap << 1;
- ForkJoinTask<?>[] newArray;
+ boolean internal) {
+ int s = top, b = base, cap, m, room; ForkJoinTask<?>[] a;
+ if ((a = array) == null || (cap = a.length) <= 0 ||
+ (room = (m = cap - 1) - (s - b)) < 0) { // could not resize
+ if (!internal)
+ unlockPhase();
+ throw new RejectedExecutionException("Queue capacity exceeded");
+ }
+ top = s + 1;
+ long pos = slotOffset(m & s);
+ if (!internal)
+ U.putReference(a, pos, task); // inside lock
+ else
+ U.getAndSetReference(a, pos, task); // fully fenced
+ if (room == 0) { // resize for next time
+ int newCap; // rapidly grow until large
+ if ((newCap = (cap < 1 << 24) ? cap << 2 : cap << 1) > 0) {
+ ForkJoinTask<?>[] newArray = null;
try {
newArray = new ForkJoinTask<?>[newCap];
- } catch (Throwable ex) {
- top = s;
- access = 0;
- throw new RejectedExecutionException(
- "Queue capacity exceeded");
+ } catch (OutOfMemoryError ex) {
}
- if (newCap > 0) { // always true
- int newMask = newCap - 1, k = s;
- do { // poll old, push to new
- newArray[k-- & newMask] = task;
- } while ((task = getAndClearSlot(a, k & m)) != null);
+ if (newArray != null) { // else throw on next push
+ int newMask = newCap - 1; // poll old, push to new
+ for (int k = s, j = cap; j > 0; --j, --k) {
+ if ((newArray[k & newMask] =
+ (ForkJoinTask<?>)U.getAndSetReference(
+ a, slotOffset(k & m), null)) == null)
+ break; // lost to pollers
+ }
+ updateArray(newArray); // fully fenced
}
- array = newArray;
}
- else
- a[m & s] = task;
- getAndSetAccess(0); // for memory effects if owned
- if ((resize || (a[m & (s - 1)] == null && signalIfEmpty)) &&
- pool != null)
- pool.signalWork();
}
+ if (!internal)
+ unlockPhase();
+ if ((room == 0 || room >= m || a[m & (s - 1)] == null) &&
+ pool != null)
+ pool.signalWork();
}
/**
@@ -1162,36 +1343,35 @@ public class ForkJoinPool extends AbstractExecutorService {
* so acts as either local-pop or local-poll. Called only by owner.
* @param fifo nonzero if FIFO mode
*/
- final ForkJoinTask<?> nextLocalTask(int fifo) {
+ private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
ForkJoinTask<?>[] a = array;
- int p = top, s = p - 1, b = base, nb, cap;
- if (p - b > 0 && a != null && (cap = a.length) > 0) {
- do {
+ int b = base, p = top, cap;
+ if (a != null && (cap = a.length) > 0) {
+ for (int m = cap - 1, s, nb; p - b > 0; ) {
if (fifo == 0 || (nb = b + 1) == p) {
- if ((t = getAndClearSlot(a, (cap - 1) & s)) != null)
- top = s;
- break; // lost race for only task
+ if ((t = (ForkJoinTask<?>)U.getAndSetReference(
+ a, slotOffset(m & (s = p - 1)), null)) != null)
+ updateTop(s); // else lost race for only task
+ break;
}
- else if ((t = getAndClearSlot(a, (cap - 1) & b)) != null) {
- base = nb;
+ if ((t = (ForkJoinTask<?>)U.getAndSetReference(
+ a, slotOffset(m & b), null)) != null) {
+ updateBase(nb);
break;
}
- else {
- while (b == (b = base)) {
- U.loadFence();
- Thread.onSpinWait(); // spin to reduce memory traffic
- }
+ while (b == (b = base)) {
+ U.loadFence();
+ Thread.onSpinWait(); // spin to reduce memory traffic
}
- } while (p - b > 0);
- U.storeStoreFence(); // for timely index updates
+ }
}
return t;
}
/**
* Takes next task, if one exists, using configured mode.
- * (Always owned, never called for Common pool.)
+ * (Always internal, never called for Common pool.)
*/
final ForkJoinTask<?> nextLocalTask() {
return nextLocalTask(config & FIFO);
@@ -1199,24 +1379,25 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Pops the given task only if it is at the current top.
+ * @param task the task. Caller must ensure non-null.
+ * @param internal if caller owns this queue
*/
- final boolean tryUnpush(ForkJoinTask<?> task, boolean owned) {
+ final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
+ boolean taken = false;
ForkJoinTask<?>[] a = array;
- int p = top, s, cap, k;
- if (task != null && base != p && a != null && (cap = a.length) > 0 &&
- a[k = (cap - 1) & (s = p - 1)] == task) {
- if (owned || getAndSetAccess(1) == 0) {
- if (top != p || a[k] != task ||
- getAndClearSlot(a, k) == null)
- access = 0;
- else {
- top = s;
- access = 0;
- return true;
- }
+ int p = top, s = p - 1, cap, k;
+ if (a != null && (cap = a.length) > 0 &&
+ a[k = (cap - 1) & s] == task &&
+ (internal || tryLockPhase())) {
+ if (top == p &&
+ U.compareAndSetReference(a, slotOffset(k), task, null)) {
+ taken = true;
+ updateTop(s);
}
+ if (!internal)
+ unlockPhase();
}
- return false;
+ return taken;
}
/**
@@ -1224,7 +1405,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array;
- int cfg = config, p = top, b = base, cap;
+ int b = base, cfg = config, p = top, cap;
if (p != b && a != null && (cap = a.length) > 0) {
if ((cfg & FIFO) == 0)
return a[(cap - 1) & (p - 1)];
@@ -1240,60 +1421,52 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Polls for a task. Used only by non-owners in usually
- * uncontended contexts.
+ * Polls for a task. Used only by non-owners.
*
* @param pool if nonnull, pool to signal if more tasks exist
*/
final ForkJoinTask<?> poll(ForkJoinPool pool) {
- for (int b = base;;) {
- int cap; ForkJoinTask<?>[] a;
- if ((a = array) == null || (cap = a.length) <= 0)
- break; // currently impossible
- int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (b != (b = base)) // inconsistent
- ;
- else if (t != null && casSlotToNull(a, k, t)) {
- base = nb;
- U.storeFence();
- if (pool != null && a[nk] != null)
- pool.signalWork(); // propagate
- return t;
+ for (;;) {
+ ForkJoinTask<?>[] a = array;
+ int b = base, cap, k;
+ if (a == null || (cap = a.length) <= 0)
+ break;
+ ForkJoinTask<?> t = a[k = b & (cap - 1)];
+ U.loadFence();
+ if (base == b) {
+ Object o;
+ int nb = b + 1, nk = nb & (cap - 1);
+ if (t == null)
+ o = a[k];
+ else if (t == (o = U.compareAndExchangeReference(
+ a, slotOffset(k), t, null))) {
+ updateBase(nb);
+ if (a[nk] != null && pool != null)
+ pool.signalWork(); // propagate
+ return t;
+ }
+ if (o == null && a[nk] == null && array == a &&
+ (phase & (IDLE | 1)) != 0 && top - base <= 0)
+ break; // empty
}
- else if (array != a || a[k] != null)
- ; // stale
- else if (a[nk] == null && top - b <= 0)
- break; // empty
}
return null;
}
/**
- * Tries to poll next task in FIFO order, failing on
- * contention or stalls. Used only by topLevelExec to repoll
- * from the queue obtained from pool.scan.
+ * Tries to poll next task in FIFO order, failing without
+ * retries on contention or stalls. Used only by topLevelExec
+ * to repoll from the queue obtained from pool.scan.
*/
- final ForkJoinTask<?> tryPoll() {
- int b = base, cap; ForkJoinTask<?>[] a;
- if ((a = array) != null && (cap = a.length) > 0) {
- for (;;) {
- int k = (cap - 1) & b, nb = b + 1;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (b != (b = base))
- ; // inconsistent
- else if (t != null) {
- if (casSlotToNull(a, k, t)) {
- base = nb;
- U.storeStoreFence();
- return t;
- }
- break; // contended
- }
- else if (a[k] == null)
- break; // empty or stalled
+ private ForkJoinTask<?> tryPoll() {
+ ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b, cap, k;
+ if ((a = array) != null && (cap = a.length) > 0 &&
+ (t = a[k = (b = base) & (cap - 1)]) != null) {
+ U.loadFence();
+ if (base == b &&
+ U.compareAndSetReference(a, slotOffset(k), t, null)) {
+ updateBase(b + 1);
+ return t;
}
}
return null;
@@ -1303,59 +1476,64 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Runs the given (stolen) task if nonnull, as well as
- * remaining local tasks and/or others available from its
- * source queue, if any.
+ * remaining local tasks and/or others available from the
+ * given queue, if any.
*/
- final void topLevelExec(ForkJoinTask<?> task, WorkQueue src) {
- int cfg = config, fifo = cfg & FIFO, nstolen = 1;
+ final void topLevelExec(ForkJoinTask<?> task, WorkQueue src, int srcId) {
+ int cfg = config, fifo = cfg & FIFO, nstolen = nsteals + 1;
+ if ((srcId & 1) != 0) // don't record external sources
+ source = srcId;
+ if ((cfg & CLEAR_TLS) != 0)
+ ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
while (task != null) {
task.doExec();
- if ((task = nextLocalTask(fifo)) == null &&
- src != null && (task = src.tryPoll()) != null)
+ if ((task = nextLocalTask(fifo)) == null && src != null &&
+ (task = src.tryPoll()) != null)
++nstolen;
}
- nsteals += nstolen;
- source = 0;
- if ((cfg & CLEAR_TLS) != 0)
- ThreadLocalRandom.eraseThreadLocals(Thread.currentThread());
+ nsteals = nstolen;
+ forgetSource();
}
/**
* Deep form of tryUnpush: Traverses from top and removes and
- * runs task if present, shifting others to fill gap.
- * @return task status if removed, else 0
+ * runs task if present.
*/
- final int tryRemoveAndExec(ForkJoinTask<?> task, boolean owned) {
+ final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
ForkJoinTask<?>[] a = array;
- int p = top, s = p - 1, d = p - base, cap;
- if (task != null && d > 0 && a != null && (cap = a.length) > 0) {
- for (int m = cap - 1, i = s; ; --i) {
- ForkJoinTask<?> t; int k;
- if ((t = a[k = i & m]) == task) {
- if (!owned && getAndSetAccess(1) != 0)
- break; // fail if locked
- else if (top != p || a[k] != task ||
- getAndClearSlot(a, k) == null) {
- access = 0;
- break; // missed
- }
- else {
- if (i != s && i == base)
- base = i + 1; // avoid shift
- else {
- for (int j = i; j != s;) // shift down
- a[j & m] = getAndClearSlot(a, ++j & m);
- top = s;
+ int b = base, p = top, s = p - 1, d = p - b, cap;
+ if (a != null && (cap = a.length) > 0) {
+ for (int m = cap - 1, i = s; d > 0; --i, --d) {
+ ForkJoinTask<?> t; int k; boolean taken;
+ if ((t = a[k = i & m]) == null)
+ break;
+ if (t == task) {
+ long pos = slotOffset(k);
+ if (!internal && !tryLockPhase())
+ break; // fail if locked
+ if (taken =
+ (top == p &&
+ U.compareAndSetReference(a, pos, task, null))) {
+ if (i == s) // act as pop
+ updateTop(s);
+ else if (i == base) // act as poll
+ updateBase(i + 1);
+ else { // swap with top
+ U.putReferenceVolatile(
+ a, pos, (ForkJoinTask<?>)
+ U.getAndSetReference(
+ a, slotOffset(s & m), null));
+ updateTop(s);
}
- releaseAccess();
- return task.doExec();
}
- }
- else if (t == null || --d == 0)
+ if (!internal)
+ unlockPhase();
+ if (taken)
+ task.doExec();
break;
+ }
}
}
- return 0;
}
/**
@@ -1364,39 +1542,45 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param task root of computation
* @param limit max runs, or zero for no limit
- * @return task status on exit
+ * @return task status if known to be done
*/
- final int helpComplete(ForkJoinTask<?> task, boolean owned, int limit) {
+ final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {
int status = 0;
if (task != null) {
outer: for (;;) {
- ForkJoinTask<?>[] a; ForkJoinTask<?> t;
- int p, s, cap, k;
- if ((status = task.status) < 0)
- return status;
- if ((a = array) == null || (cap = a.length) <= 0 ||
- (t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null ||
- !(t instanceof CountedCompleter))
+ ForkJoinTask<?>[] a; ForkJoinTask<?> t; boolean taken;
+ int stat, p, s, cap, k;
+ if ((stat = task.status) < 0) {
+ status = stat;
+ break;
+ }
+ if ((a = array) == null || (cap = a.length) <= 0)
+ break;
+ if ((t = a[k = (cap - 1) & (s = (p = top) - 1)]) == null)
+ break;
+ if (!(t instanceof CountedCompleter))
break;
- for (CountedCompleter<?> f = (CountedCompleter<?>)t;;) {
+ CountedCompleter<?> f = (CountedCompleter<?>)t;
+ for (int steps = cap;;) { // bound path
if (f == task)
break;
- else if ((f = f.completer) == null)
- break outer; // ineligible
+ if ((f = f.completer) == null || --steps == 0)
+ break outer;
}
- if (!owned && getAndSetAccess(1) != 0)
- break; // fail if locked
- if (top != p || a[k] != t || getAndClearSlot(a, k) == null) {
- access = 0;
- break; // missed
- }
- top = s;
- releaseAccess();
+ if (!internal && !tryLockPhase())
+ break;
+ if (taken =
+ (top == p &&
+ U.compareAndSetReference(a, slotOffset(k), t, null)))
+ updateTop(s);
+ if (!internal)
+ unlockPhase();
+ if (!taken)
+ break;
t.doExec();
if (limit != 0 && --limit == 0)
break;
}
- status = task.status;
}
return status;
}
@@ -1408,67 +1592,50 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param blocker the blocker
*/
final void helpAsyncBlocker(ManagedBlocker blocker) {
- if (blocker != null) {
- for (;;) {
- int b = base, cap; ForkJoinTask<?>[] a;
- if ((a = array) == null || (cap = a.length) <= 0 || b == top)
- break;
- int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (base != b)
- ;
- else if (blocker.isReleasable())
- break;
- else if (a[k] != t)
- ;
- else if (t != null) {
- if (!(t instanceof CompletableFuture
- .AsynchronousCompletionTask))
- break;
- else if (casSlotToNull(a, k, t)) {
- base = nb;
- U.storeStoreFence();
- t.doExec();
- }
- }
- else if (a[nk] == null)
+ for (;;) {
+ ForkJoinTask<?>[] a; int b, cap, k;
+ if ((a = array) == null || (cap = a.length) <= 0)
+ break;
+ ForkJoinTask<?> t = a[k = (b = base) & (cap - 1)];
+ U.loadFence();
+ if (t == null) {
+ if (top - b <= 0)
break;
}
+ else if (!(t instanceof CompletableFuture
+ .AsynchronousCompletionTask))
+ break;
+ if (blocker != null && blocker.isReleasable())
+ break;
+ if (base == b && t != null &&
+ U.compareAndSetReference(a, slotOffset(k), t, null)) {
+ updateBase(b + 1);
+ t.doExec();
+ }
}
}
// misc
/**
- * Returns true if owned by a worker thread and not known to be blocked.
+ * Returns true if internal and not known to be blocked.
*/
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
- return (access != STOP && (wt = owner) != null &&
+ return ((wt = owner) != null && (phase & IDLE) != 0 &&
(s = wt.getState()) != Thread.State.BLOCKED &&
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
- /**
- * Called in constructors if ThreadLocals not preserved
- */
- final void setClearThreadLocals() {
- config |= CLEAR_TLS;
- }
-
static {
U = Unsafe.getUnsafe();
Class<WorkQueue> klass = WorkQueue.class;
- ACCESS = U.objectFieldOffset(klass, "access");
PHASE = U.objectFieldOffset(klass, "phase");
- Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
- ABASE = U.arrayBaseOffset(aklass);
- int scale = U.arrayIndexScale(aklass);
- ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
- if ((scale & (scale - 1)) != 0)
- throw new Error("array index scale not a power of two");
+ BASE = U.objectFieldOffset(klass, "base");
+ TOP = U.objectFieldOffset(klass, "top");
+ SOURCE = U.objectFieldOffset(klass, "source");
+ ARRAY = U.objectFieldOffset(klass, "array");
}
}
@@ -1500,27 +1667,21 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static volatile RuntimePermission modifyThreadPermission;
-
- // Instance fields
- volatile long stealCount; // collects worker nsteals
- volatile long threadIds; // for worker thread names
- final long keepAlive; // milliseconds before dropping if idle
- final long bounds; // min, max threads packed as shorts
- final int config; // static configuration bits
- volatile int runState; // SHUTDOWN, STOP, TERMINATED bits
- WorkQueue[] queues; // main registry
- final ReentrantLock registrationLock;
- Condition termination; // lazily constructed
- final String workerNamePrefix; // null for common pool
+ // fields declared in order of their likely layout on most VMs
+ volatile CountDownLatch termination; // lazily constructed
+ final Predicate<? super ForkJoinPool> saturate;
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
- final Predicate<? super ForkJoinPool> saturate;
final SharedThreadContainer container;
-
- @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
+ final String workerNamePrefix; // null for common pool
+ WorkQueue[] queues; // main registry
+ final long keepAlive; // milliseconds before dropping if idle
+ final long config; // static configuration bits
+ volatile long stealCount; // collects worker nsteals
+ volatile long threadIds; // for worker thread names
volatile long ctl; // main pool control
- @jdk.internal.vm.annotation.Contended("fjpctl") // colocate
int parallelism; // target number of workers
+ volatile int runState; // versioned, lockable
// Support for atomic operations
private static final Unsafe U;
@@ -1528,6 +1689,7 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final long RUNSTATE;
private static final long PARALLELISM;
private static final long THREADIDS;
+ private static final long TERMINATION;
private static final Object POOLIDS_BASE;
private static final long POOLIDS;
@@ -1540,9 +1702,6 @@ public class ForkJoinPool extends AbstractExecutorService {
private long getAndAddCtl(long v) {
return U.getAndAddLong(this, CTL, v);
}
- private int getAndBitwiseOrRunState(int v) {
- return U.getAndBitwiseOrInt(this, RUNSTATE, v);
- }
private long incrementThreadIds() {
return U.getAndAddLong(this, THREADIDS, 1L);
}
@@ -1555,6 +1714,51 @@ public class ForkJoinPool extends AbstractExecutorService {
private int getParallelismOpaque() {
return U.getIntOpaque(this, PARALLELISM);
}
+ private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
+ return (CountDownLatch)
+ U.compareAndExchangeReference(this, TERMINATION, null, x);
+ }
+
+ // runState operations
+
+ private int getAndBitwiseOrRunState(int v) { // for status bits
+ return U.getAndBitwiseOrInt(this, RUNSTATE, v);
+ }
+ private boolean casRunState(int c, int v) {
+ return U.compareAndSetInt(this, RUNSTATE, c, v);
+ }
+ private void unlockRunState() { // increment lock bit
+ U.getAndAddInt(this, RUNSTATE, RS_LOCK);
+ }
+ private int lockRunState() { // lock and return current state
+ int s, u; // locked when RS_LOCK set
+ if (((s = runState) & RS_LOCK) == 0 && casRunState(s, u = s + RS_LOCK))
+ return u;
+ else
+ return spinLockRunState();
+ }
+ private int spinLockRunState() { // spin/sleep
+ for (int waits = 0, s, u;;) {
+ if (((s = runState) & RS_LOCK) == 0) {
+ if (casRunState(s, u = s + RS_LOCK))
+ return u;
+ waits = 0;
+ } else if (waits < SPIN_WAITS) {
+ ++waits;
+ Thread.onSpinWait();
+ } else {
+ if (waits < MIN_SLEEP)
+ waits = MIN_SLEEP;
+ LockSupport.parkNanos(this, (long)waits);
+ if (waits < MAX_SLEEP)
+ waits <<= 1;
+ }
+ }
+ }
+
+ static boolean poolIsStopping(ForkJoinPool p) { // Used by ForkJoinTask
+ return p != null && (p.runState & STOP) != 0;
+ }
// Creating, registering, and deregistering workers
@@ -1567,12 +1771,16 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
+ SharedThreadContainer ctr = container;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
- if (runState >= 0 && // avoid construction if terminating
+ if ((runState & STOP) == 0 && // avoid construction if terminating
fac != null && (wt = fac.newThread(this)) != null) {
- container.start(wt);
+ if (ctr != null)
+ ctr.start(wt);
+ else
+ wt.start();
return true;
}
} catch (Throwable rex) {
@@ -1594,49 +1802,49 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Finishes initializing and records owned queue.
+ * Finishes initializing and records internal queue.
*
* @param w caller's WorkQueue
*/
final void registerWorker(WorkQueue w) {
- ThreadLocalRandom.localInit();
- int seed = ThreadLocalRandom.getProbe();
- ReentrantLock lock = registrationLock;
- int cfg = config & FIFO;
- if (w != null && lock != null) {
+ if (w != null) {
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
- cfg |= w.config | SRC;
- w.stackPred = seed;
- int id = (seed << 1) | 1; // initial index guess
- lock.lock();
+ ThreadLocalRandom.localInit();
+ int seed = w.stackPred = ThreadLocalRandom.getProbe();
+ int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
+ int id = ((seed << 1) | 1) & SMASK; // base of linear-probe-like scan
+ int stop = lockRunState() & STOP;
try {
- WorkQueue[] qs; int n; // find queue index
- if ((qs = queues) != null && (n = qs.length) > 0) {
- int k = n, m = n - 1;
- for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2);
- if (k == 0)
- id = n | 1; // resize below
- w.phase = w.config = id | cfg; // now publishable
-
+ WorkQueue[] qs; int n;
+ if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0) {
+ for (int k = n, m = n - 1; ; id += 2) {
+ if (qs[id &= m] == null)
+ break;
+ if ((k -= 2) <= 0) {
+ id |= n;
+ break;
+ }
+ }
+ w.phase = id | phaseSeq; // now publishable
if (id < n)
qs[id] = w;
- else { // expand array
+ else { // expand
int an = n << 1, am = an - 1;
WorkQueue[] as = new WorkQueue[an];
as[id & am] = w;
for (int j = 1; j < n; j += 2)
as[j] = qs[j];
for (int j = 0; j < n; j += 2) {
- WorkQueue q;
- if ((q = qs[j]) != null) // shared queues may move
- as[q.config & am] = q;
+ WorkQueue q; // shared queues may move
+ if ((q = qs[j]) != null)
+ as[q.phase & EXTERNAL_ID_MASK & am] = q;
}
- U.storeFence(); // fill before publish
+ U.storeFence(); // fill before publish
queues = as;
}
}
} finally {
- lock.unlock();
+ unlockRunState();
}
}
}
@@ -1651,146 +1859,159 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
- WorkQueue w = (wt == null) ? null : wt.workQueue;
- int cfg = (w == null) ? 0 : w.config;
+ WorkQueue w = null;
+ int src = 0, phase = 0;
+ boolean replaceable = false;
+ if (wt != null && (w = wt.workQueue) != null) {
+ phase = w.phase;
+ if ((src = w.source) != DEREGISTERED) { // else trimmed on timeout
+ w.source = DEREGISTERED;
+ if (phase != 0) { // else failed to start
+ replaceable = true;
+ if ((phase & IDLE) != 0)
+ reactivate(w); // pool stopped before released
+ }
+ }
+ }
long c = ctl;
- if ((cfg & TRIMMED) == 0) // decrement counts
+ if (src != DEREGISTERED) // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
- (SP_MASK & c)))));
- else if ((int)c == 0) // was dropped on timeout
- cfg &= ~SRC; // suppress signal if last
- if (!tryTerminate(false, false) && w != null) {
- ReentrantLock lock; WorkQueue[] qs; int n, i;
- long ns = w.nsteals & 0xffffffffL;
- if ((lock = registrationLock) != null) {
- lock.lock(); // remove index unless terminating
- if ((qs = queues) != null && (n = qs.length) > 0 &&
- qs[i = cfg & (n - 1)] == w)
- qs[i] = null;
- stealCount += ns; // accumulate steals
- lock.unlock();
+ (LMASK & c)))));
+ else if ((int)c == 0) // was dropped on timeout
+ replaceable = false;
+ if (w != null) { // cancel remaining tasks
+ for (ForkJoinTask<?> t; (t = w.nextLocalTask()) != null; ) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
+ }
}
- if ((cfg & SRC) != 0)
- signalWork(); // possibly replace worker
- }
- if (ex != null) {
- if (w != null) {
- w.access = STOP; // cancel tasks
- for (ForkJoinTask<?> t; (t = w.nextLocalTask(0)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
+ }
+ if ((tryTerminate(false, false) & STOP) == 0 && w != null) {
+ WorkQueue[] qs; int n, i; // remove index unless terminating
+ long ns = w.nsteals & 0xffffffffL;
+ int stop = lockRunState() & STOP;
+ if (stop == 0 && (qs = queues) != null && (n = qs.length) > 0 &&
+ qs[i = phase & SMASK & (n - 1)] == w) {
+ qs[i] = null;
+ stealCount += ns; // accumulate steals
}
- ForkJoinTask.rethrow(ex);
+ unlockRunState();
}
+ if ((runState & STOP) == 0 && replaceable)
+ signalWork(); // may replace unless trimmed or uninitialized
+ if (ex != null)
+ ForkJoinTask.rethrow(ex);
}
- /*
+ /**
* Releases an idle worker, or creates one if not enough exist.
*/
final void signalWork() {
- int pc = parallelism, n;
- long c = ctl;
- WorkQueue[] qs = queues;
- if ((short)(c >>> RC_SHIFT) < pc && qs != null && (n = qs.length) > 0) {
- for (;;) {
- boolean create = false;
- int sp = (int)c & ~INACTIVE;
- WorkQueue v = qs[sp & (n - 1)];
- int deficit = pc - (short)(c >>> TC_SHIFT);
- long ac = (c + RC_UNIT) & RC_MASK, nc;
- if (sp != 0 && v != null)
- nc = (v.stackPred & SP_MASK) | (c & TC_MASK);
- else if (deficit <= 0)
- break;
- else {
- create = true;
- nc = ((c + TC_UNIT) & TC_MASK);
- }
- if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
- if (create)
- createWorker();
- else {
- Thread owner = v.owner;
- v.phase = sp;
- if (v.access == PARKED)
- LockSupport.unpark(owner);
- }
+ int pc = parallelism;
+ for (long c = ctl;;) {
+ WorkQueue[] qs = queues;
+ long ac = (c + RC_UNIT) & RC_MASK, nc;
+ int sp = (int)c, i = sp & SMASK;
+ if (qs == null || qs.length <= i)
+ break;
+ WorkQueue w = qs[i], v = null;
+ if (sp == 0) {
+ if ((short)(c >>> TC_SHIFT) >= pc)
break;
- }
+ nc = ((c + TC_UNIT) & TC_MASK);
}
- }
- }
-
- /**
- * Reactivates any idle worker, if one exists.
- *
- * @return the signalled worker, or null if none
- */
- private WorkQueue reactivate() {
- WorkQueue[] qs; int n;
- long c = ctl;
- if ((qs = queues) != null && (n = qs.length) > 0) {
- for (;;) {
- int sp = (int)c & ~INACTIVE;
- WorkQueue v = qs[sp & (n - 1)];
- long ac = UC_MASK & (c + RC_UNIT);
- if (sp == 0 || v == null)
- break;
- if (c == (c = compareAndExchangeCtl(
- c, (v.stackPred & SP_MASK) | ac))) {
- Thread owner = v.owner;
+ else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
+ break;
+ else
+ nc = (v.stackPred & LMASK) | (c & TC_MASK);
+ if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
+ if (v == null)
+ createWorker();
+ else {
+ Thread t;
v.phase = sp;
- if (v.access == PARKED)
- LockSupport.unpark(owner);
- return v;
+ if ((t = v.parker) != null)
+ U.unpark(t);
}
+ break;
}
}
- return null;
}
/**
- * Tries to deactivate worker w; called only on idle timeout.
+ * Reactivates the given worker, and possibly interrupts others if
+ * not top of ctl stack. Called only during shutdown to ensure release
+ * on termination.
*/
- private boolean tryTrim(WorkQueue w) {
- if (w != null) {
- int pred = w.stackPred, cfg = w.config | TRIMMED;
- long c = ctl;
- int sp = (int)c & ~INACTIVE;
- if ((sp & SMASK) == (cfg & SMASK) &&
- compareAndSetCtl(c, ((pred & SP_MASK) |
- (UC_MASK & (c - TC_UNIT))))) {
- w.config = cfg; // add sentinel for deregisterWorker
- w.phase = sp;
- return true;
+ private void reactivate(WorkQueue w) {
+ for (long c = ctl;;) {
+ WorkQueue[] qs; WorkQueue v; int sp, i;
+ if ((qs = queues) == null || (sp = (int)c) == 0 ||
+ qs.length <= (i = sp & SMASK) || (v = qs[i]) == null ||
+ (v != w && w != null && (w.phase & IDLE) == 0))
+ break;
+ if (c == (c = compareAndExchangeCtl(
+ c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
+ (v.stackPred & LMASK))))) {
+ Thread t;
+ v.phase = sp;
+ if ((t = v.parker) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
+ }
+ }
+ if (v == w)
+ break;
}
}
- return false;
}
/**
- * Returns true if any queue is detectably nonempty. Accurate
- * only when workers are quiescent; else conservatively
- * approximate.
- * @param submissionsOnly if true, only check submission queues
- */
- private boolean hasTasks(boolean submissionsOnly) {
- int step = submissionsOnly ? 2 : 1;
- for (int checkSum = 0;;) { // repeat until stable (normally twice)
- U.loadFence();
- WorkQueue[] qs = queues;
- int n = (qs == null) ? 0 : qs.length, sum = 0;
- for (int i = 0; i < n; i += step) {
- WorkQueue q; int s;
- if ((q = qs[i]) != null) {
- if (q.access > 0 || (s = q.top) != q.base)
- return true;
- sum += (s << 16) + i + 1;
+ * Internal version of isQuiescent and related functionality.
+ * @return true if terminating or all workers are inactive and
+ * submission queues are empty and unlocked; if so, setting STOP
+ * if shutdown is enabled
+ */
+ private boolean quiescent() {
+ outer: for (;;) {
+ long phaseSum = 0L;
+ boolean swept = false;
+ for (int e, prevRunState = 0; ; prevRunState = e) {
+ long c = ctl;
+ if (((e = runState) & STOP) != 0)
+ return true; // terminating
+ else if ((c & RC_MASK) > 0L)
+ return false; // at least one active
+ else if (!swept || e != prevRunState || (e & RS_LOCK) != 0) {
+ long sum = c;
+ WorkQueue[] qs = queues; WorkQueue q;
+ int n = (qs == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) { // scan queues
+ if ((q = qs[i]) != null) {
+ int p = q.phase, s = q.top, b = q.base;
+ sum += (p & 0xffffffffL) | ((long)b << 32);
+ if ((p & IDLE) == 0 || s - b > 0) {
+ if ((i & 1) == 0 && compareAndSetCtl(c, c))
+ signalWork(); // ensure live
+ return false;
+ }
+ }
+ }
+ swept = (phaseSum == (phaseSum = sum));
}
+ else if (compareAndSetCtl(c, c) && // confirm
+ casRunState(e, (e & SHUTDOWN) != 0 ? e | STOP : e)) {
+ if ((e & SHUTDOWN) != 0) // enable termination
+ interruptAll();
+ return true;
+ }
+ else
+ break; // restart
}
- if (checkSum == (checkSum = sum))
- return false;
}
}
@@ -1801,129 +2022,139 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
- if (w != null) { // skip on failed init
- int r = w.stackPred, src = 0; // use seed from registerWorker
- do {
- r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
- } while ((src = scan(w, src, r)) >= 0 ||
- (src = awaitWork(w)) == 0);
- w.access = STOP; // record normal termination
+ if (w != null) {
+ int phase = w.phase, r = w.stackPred; // seed from registerWorker
+ for (long window = NO_HISTORY | (r >>> 16);;) {
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ if ((runState & STOP) != 0) // terminating
+ break;
+ if (window == (window = scan(w, window & WMASK, r)) &&
+ window >= 0L && phase != (phase = awaitWork(w, phase))) {
+ if ((phase & IDLE) != 0)
+ break; // worker exit
+ window = NO_HISTORY | (window & SMASK); // clear history
+ }
+ }
}
}
/**
* Scans for and if found executes top-level tasks: Tries to poll
- * each queue starting at a random index with random stride,
- * returning source id or retry indicator.
+ * each queue starting at initial index with random stride,
+ * returning next scan window and retry indicator.
*
* @param w caller's WorkQueue
- * @param prevSrc the two previous queues (if nonzero) stolen from in current phase, packed as int
+ * @param window up to three queue indices
* @param r random seed
- * @return the next prevSrc value to use, or negative if none found
+ * @return the next window to use, with RESCAN set for rescan
*/
- private int scan(WorkQueue w, int prevSrc, int r) {
+ private long scan(WorkQueue w, long window, int r) {
WorkQueue[] qs = queues;
- int n = (w == null || qs == null) ? 0 : qs.length;
- for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
+ int n = (qs == null) ? 0 : qs.length, step = (r << 1) | 1;
+ outer: for (int i = (short)window, l = n; l > 0; --l, i += step) {
int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
- if ((q = qs[j = r & (n - 1)]) != null &&
+ if ((q = qs[j = i & SMASK & (n - 1)]) != null &&
(a = q.array) != null && (cap = a.length) > 0) {
- int src = j | SRC, b = q.base;
- int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (q.base != b) // inconsistent
- return prevSrc;
- else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
- q.base = nb;
- w.source = src;
- if (src + (src << SWIDTH) != prevSrc &&
- q.base == nb && a[nk] != null)
- signalWork(); // propagate at most twice/run
- w.topLevelExec(t, q);
- return src + (prevSrc << SWIDTH);
- }
- else if (q.array != a || a[k] != null || a[nk] != null)
- return prevSrc; // revisit
- }
- }
- return -1;
- }
-
- /**
- * Advances phase, enqueues, and awaits signal or termination.
- *
- * @return negative if terminated, else 0
- */
- private int awaitWork(WorkQueue w) {
- if (w == null)
- return -1; // currently impossible
- int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase
- boolean idle = false; // true if possibly quiescent
- if (runState < 0)
- return -1; // terminating
- long sp = p & SP_MASK, pc = ctl, qc;
- w.phase = p | INACTIVE;
- do { // enqueue
- w.stackPred = (int)pc; // set ctl stack link
- } while (pc != (pc = compareAndExchangeCtl(
- pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
- if ((qc & RC_MASK) <= 0L) {
- if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
- return 0; // check for stragglers
- if (runState != 0 && tryTerminate(false, false))
- return -1; // quiescent termination
- idle = true;
- }
- WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
- int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
- while ((p = w.phase) < 0 && --spins > 0)
- Thread.onSpinWait();
- if (p < 0) {
- long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
- LockSupport.setCurrentBlocker(this);
- for (;;) { // await signal or termination
- if (runState < 0)
- return -1;
- w.access = PARKED; // enable unpark
- if (w.phase < 0) {
- if (idle)
- LockSupport.parkUntil(deadline);
- else
- LockSupport.park();
- }
- w.access = 0; // disable unpark
- if (w.phase >= 0) {
- LockSupport.setCurrentBlocker(null);
- break;
- }
- Thread.interrupted(); // clear status for next park
- if (idle) { // check for idle timeout
- if (deadline - System.currentTimeMillis() < TIMEOUT_SLOP) {
- if (tryTrim(w))
- return -1;
- else // not at head; restart timer
- deadline += keepAlive;
+ for (;;) {
+ int b, k; Object o;
+ ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
+ U.loadFence(); // re-read b and t
+ if (q.base == b) { // else inconsistent; retry
+ int nb = b + 1, nk = nb & (cap - 1);
+ if (t == null) {
+ if (a[k] == null) { // revisit if another task
+ if (window >= 0L && a[nk] != null)
+ window |= RESCAN;
+ break;
+ }
+ }
+ else if (t == (o = U.compareAndExchangeReference(
+ a, slotOffset(k), t, null))) {
+ q.updateBase(nb);
+ long pw = window, nw = ((pw << 16) | j) & WMASK;
+ window = nw | RESCAN;
+ if ((nw != pw || (short)(nw >>> 32) != j) &&
+ a[nk] != null)
+ signalWork(); // limit propagation
+ if (w != null) // always true
+ w.topLevelExec(t, q, j);
+ break outer;
+ }
+ else if (o == null) // contended
+ break; // retried unless newly active
}
}
}
}
- return 0;
+ return window;
}
/**
- * Non-overridable version of isQuiescent. Returns true if
- * quiescent or already terminating.
+ * Tries to inactivate, and if successful, awaits signal or termination.
+ *
+ * @param w the worker (may be null if already terminated)
+ * @param p current phase
+ * @return current phase, with IDLE set if worker should exit
*/
- private boolean canStop() {
- long c = ctl;
- do {
- if (runState < 0)
- break;
- if ((c & RC_MASK) > 0L || hasTasks(false))
- return false;
- } while (c != (c = ctl)); // validate
- return true;
+ private int awaitWork(WorkQueue w, int p) {
+ if (w != null) {
+ int idlePhase = p + IDLE, nextPhase = p + (IDLE << 1);
+ long pc = ctl, qc = (nextPhase & LMASK) | ((pc - RC_UNIT) & UMASK);
+ w.stackPred = (int)pc; // set ctl stack link
+ w.phase = idlePhase; // try to inactivate
+ if (!compareAndSetCtl(pc, qc)) // contended enque
+ return w.phase = p; // back out
+ int ac = (short)(qc >>> RC_SHIFT);
+ boolean quiescent = (ac <= 0 && quiescent());
+ if ((runState & STOP) != 0)
+ return idlePhase;
+ int spins = ac + ((((int)(qc >>> TC_SHIFT)) & SMASK) << 1);
+ while ((p = w.phase) == idlePhase && --spins > 0)
+ Thread.onSpinWait(); // spin for approx #accesses to signal
+ if (p == idlePhase) {
+ long deadline = (!quiescent ? 0L : // timeout for trim
+ System.currentTimeMillis() + keepAlive);
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) { // recheck queues
+ WorkQueue q; ForkJoinTask<?>[] a; int cap;
+ if ((q = qs[i]) != null &&
+ (a = q.array) != null && (cap = a.length) > 0 &&
+ a[q.base & (cap - 1)] != null &&
+ ctl == qc && compareAndSetCtl(qc, pc)) {
+ w.phase = (int)qc; // release
+ break;
+ }
+ }
+ if ((p = w.phase) == idlePhase) { // emulate LockSupport.park
+ LockSupport.setCurrentBlocker(this);
+ w.parker = Thread.currentThread();
+ for (;;) {
+ if ((runState & STOP) != 0 || (p = w.phase) != idlePhase)
+ break;
+ U.park(quiescent, deadline);
+ if ((p = w.phase) != idlePhase || (runState & STOP) != 0)
+ break;
+ Thread.interrupted(); // clear for next park
+ if (quiescent && TIMEOUT_SLOP >
+ deadline - System.currentTimeMillis()) {
+ long sp = w.stackPred & LMASK;
+ long c = ctl, nc = sp | (UMASK & (c - TC_UNIT));
+ if (((int)c & SMASK) == (idlePhase & SMASK) &&
+ compareAndSetCtl(c, nc)) {
+ w.source = DEREGISTERED;
+ w.phase = (int)c;
+ break;
+ }
+ deadline += keepAlive; // not head; reset timer
+ }
+ }
+ w.parker = null;
+ LockSupport.setCurrentBlocker(null);
+ }
+ }
+ }
+ return p;
}
/**
@@ -1934,16 +2165,18 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param submissionsOnly if true, only scan submission queues
*/
private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
- int r = ThreadLocalRandom.nextSecondarySeed();
- if (submissionsOnly) // even indices only
- r &= ~1;
- int step = (submissionsOnly) ? 2 : 1;
- WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
- if (runState >= 0 && (qs = queues) != null && (n = qs.length) > 0) {
- for (int i = n; i > 0; i -= step, r += step) {
- if ((q = qs[r & (n - 1)]) != null &&
- (t = q.poll(this)) != null)
- return t;
+ if ((runState & STOP) == 0) {
+ WorkQueue[] qs; int n; WorkQueue q; ForkJoinTask<?> t;
+ int r = ThreadLocalRandom.nextSecondarySeed();
+ if (submissionsOnly) // even indices only
+ r &= ~1;
+ int step = (submissionsOnly) ? 2 : 1;
+ if ((qs = queues) != null && (n = qs.length) > 0) {
+ for (int i = n; i > 0; i -= step, r += step) {
+ if ((q = qs[r & (n - 1)]) != null &&
+ (t = q.poll(this)) != null)
+ return t;
+ }
}
}
return null;
@@ -1958,47 +2191,48 @@ public class ForkJoinPool extends AbstractExecutorService {
* unblocked.
*
* @param c incoming ctl value
- * @param canSaturate to override saturate predicate
* @return UNCOMPENSATE: block then adjust, 0: block, -1 : retry
*/
- private int tryCompensate(long c, boolean canSaturate) {
+ private int tryCompensate(long c) {
Predicate<? super ForkJoinPool> sat;
- long b = bounds; // unpack fields
- int pc = parallelism;
- int minActive = (short)(b & SMASK),
- maxTotal = (short)(b >>> SWIDTH) + pc,
+ long b = config;
+ int pc = parallelism, // unpack fields
+ minActive = (short)(b >>> RC_SHIFT),
+ maxTotal = (short)(b >>> TC_SHIFT) + pc,
active = (short)(c >>> RC_SHIFT),
total = (short)(c >>> TC_SHIFT),
- sp = (int)c & ~INACTIVE;
- if (sp != 0 && active <= pc) { // activate idle worker
- WorkQueue[] qs; WorkQueue v; int i;
- if (ctl == c && (qs = queues) != null &&
- qs.length > (i = sp & SMASK) && (v = qs[i]) != null) {
- long nc = (v.stackPred & SP_MASK) | (UC_MASK & c);
- if (compareAndSetCtl(c, nc)) {
- v.phase = sp;
- LockSupport.unpark(v.owner);
- return UNCOMPENSATE;
- }
+ sp = (int)c,
+ stat = -1; // default retry return
+ if (sp != 0 && active <= pc) { // activate idle worker
+ WorkQueue[] qs; WorkQueue v; int i; Thread t;
+ if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
+ (v = qs[i]) != null &&
+ compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
+ v.phase = sp;
+ if ((t = v.parker) != null)
+ U.unpark(t);
+ stat = UNCOMPENSATE;
}
- return -1; // retry
}
else if (active > minActive && total >= pc) { // reduce active workers
- long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
- return compareAndSetCtl(c, nc) ? UNCOMPENSATE : -1;
+ if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
+ stat = UNCOMPENSATE;
}
- else if (total < maxTotal && total < MAX_CAP) { // expand pool
+ else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
- return (!compareAndSetCtl(c, nc) ? -1 :
- !createWorker() ? 0 : UNCOMPENSATE);
+ if ((runState & STOP) != 0) // terminating
+ stat = 0;
+ else if (compareAndSetCtl(c, nc))
+ stat = createWorker() ? UNCOMPENSATE : 0;
}
else if (!compareAndSetCtl(c, c)) // validate
- return -1;
- else if (canSaturate || ((sat = saturate) != null && sat.test(this)))
- return 0;
+ ;
+ else if ((sat = saturate) != null && sat.test(this))
+ stat = 0;
else
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
+ return stat;
}
/**
@@ -2016,234 +2250,256 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param task the task
* @param w caller's WorkQueue
- * @param timed true if this is a timed join
+ * @param internal true if w is owned by a ForkJoinWorkerThread
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
- final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean timed) {
- if (w == null || task == null)
- return 0;
- int wsrc = w.source, wid = (w.config & SMASK) | SRC, r = wid + 2;
- long sctl = 0L; // track stability
- for (boolean rescan = true;;) {
- int s; WorkQueue[] qs;
- if ((s = task.status) < 0)
- return s;
- if (!rescan && sctl == (sctl = ctl)) {
- if (runState < 0)
- return 0;
- if ((s = tryCompensate(sctl, timed)) >= 0)
- return s; // block
- }
- rescan = false;
- int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
- scan: for (int i = n >>> 1; i > 0; --i, r += 2) {
- int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
- if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
- (cap = a.length) > 0) {
- for (int src = j | SRC;;) {
- int sq = q.source, b = q.base;
- int k = (cap - 1) & b, nb = b + 1;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- boolean eligible = true; // check steal chain
- for (int d = n, v = sq;;) { // may be cyclic; bound
- WorkQueue p;
- if (v == wid)
- break;
- if (v == 0 || --d == 0 || (p = qs[v & m]) == null) {
- eligible = false;
+
+ final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
+ if (w != null)
+ w.tryRemoveAndExec(task, internal);
+ int s = 0;
+ if (task != null && (s = task.status) >= 0 && internal && w != null) {
+ int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source;
+ long sctl = 0L; // track stability
+ outer: for (boolean rescan = true;;) {
+ if ((s = task.status) < 0)
+ break;
+ if (!rescan) {
+ if ((runState & STOP) != 0)
+ break;
+ if (sctl == (sctl = ctl) && (s = tryCompensate(sctl)) >= 0)
+ break;
+ }
+ rescan = false;
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ scan: for (int l = n >>> 1; l > 0; --l, r += 2) {
+ int j; WorkQueue q;
+ if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
+ for (;;) {
+ int sq = q.source, b, cap, k; ForkJoinTask<?>[] a;
+ if ((a = q.array) == null || (cap = a.length) <= 0)
break;
+ ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
+ U.loadFence();
+ boolean eligible = false;
+ if (t == task)
+ eligible = true;
+ else if (t != null) { // check steal chain
+ for (int v = sq, d = cap;;) {
+ WorkQueue p;
+ if (v == wid) {
+ eligible = true;
+ break;
+ }
+ if ((v & 1) == 0 || // external or none
+ --d < 0 || // bound depth
+ (p = qs[v & (n - 1)]) == null)
+ break;
+ v = p.source;
+ }
}
- v = p.source;
- }
- if (q.source != sq || q.base != b)
- ; // stale
- else if ((s = task.status) < 0)
- return s; // recheck before taking
- else if (t == null) {
- if (a[k] == null) {
- if (!rescan && eligible &&
- (q.array != a || q.top != b))
- rescan = true; // resized or stalled
- break;
+ if ((s = task.status) < 0)
+ break outer; // validate
+ if (q.source == sq && q.base == b && a[k] == t) {
+ int nb = b + 1, nk = nb & (cap - 1);
+ if (!eligible) { // revisit if nonempty
+ if (!rescan && t == null &&
+ (a[nk] != null || q.top - b > 0))
+ rescan = true;
+ break;
+ }
+ if (U.compareAndSetReference(
+ a, slotOffset(k), t, null)) {
+ q.updateBase(nb);
+ w.source = j;
+ t.doExec();
+ w.source = wsrc;
+ rescan = true; // restart at index r
+ break scan;
+ }
}
}
- else if (t != task && !eligible)
- break;
- else if (WorkQueue.casSlotToNull(a, k, t)) {
- q.base = nb;
- w.source = src;
- t.doExec();
- w.source = wsrc;
- rescan = true;
- break scan;
- }
}
}
}
}
+ return s;
}
/**
* Version of helpJoin for CountedCompleters.
*
- * @param task the task
+ * @param task root of computation (only called when a CountedCompleter)
* @param w caller's WorkQueue
- * @param owned true if w is owned by a ForkJoinWorkerThread
- * @param timed true if this is a timed join
+ * @param internal true if w is owned by a ForkJoinWorkerThread
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
- final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned,
- boolean timed) {
- if (w == null || task == null)
- return 0;
- int wsrc = w.source, r = w.config;
- long sctl = 0L; // track stability
- for (boolean rescan = true;;) {
- int s; WorkQueue[] qs;
- if ((s = w.helpComplete(task, owned, 0)) < 0)
- return s;
- if (!rescan && sctl == (sctl = ctl)) {
- if (!owned || runState < 0)
- return 0;
- if ((s = tryCompensate(sctl, timed)) >= 0)
- return s;
- }
- rescan = false;
- int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
- scan: for (int i = n; i > 0; --i, ++r) {
- int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
- if ((q = qs[j = r & m]) != null && (a = q.array) != null &&
- (cap = a.length) > 0) {
- poll: for (int src = j | SRC, b = q.base;;) {
- int k = (cap - 1) & b, nb = b + 1;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (b != (b = q.base))
- ; // stale
- else if ((s = task.status) < 0)
- return s; // recheck before taking
- else if (t == null) {
- if (a[k] == null) {
- if (!rescan && // resized or stalled
- (q.array != a || q.top != b))
- rescan = true;
+ final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean internal) {
+ int s = 0;
+ if (task != null && (s = task.status) >= 0 && w != null) {
+ int r = w.phase + 1; // for indexing
+ long sctl = 0L; // track stability
+ outer: for (boolean rescan = true, locals = true;;) {
+ if (locals && (s = w.helpComplete(task, internal, 0)) < 0)
+ break;
+ if ((s = task.status) < 0)
+ break;
+ if (!rescan) {
+ if ((runState & STOP) != 0)
+ break;
+ if (sctl == (sctl = ctl) &&
+ (!internal || (s = tryCompensate(sctl)) >= 0))
+ break;
+ }
+ rescan = locals = false;
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ scan: for (int l = n; l > 0; --l, ++r) {
+ int j; WorkQueue q;
+ if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
+ for (;;) {
+ ForkJoinTask<?>[] a; int b, cap, k;
+ if ((a = q.array) == null || (cap = a.length) <= 0)
break;
+ ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
+ U.loadFence();
+ boolean eligible = false;
+ if (t instanceof CountedCompleter) {
+ CountedCompleter<?> f = (CountedCompleter<?>)t;
+ for (int steps = cap; steps > 0; --steps) {
+ if (f == task) {
+ eligible = true;
+ break;
+ }
+ if ((f = f.completer) == null)
+ break;
+ }
}
- }
- else if (t instanceof CountedCompleter) {
- CountedCompleter<?> f;
- for (f = (CountedCompleter<?>)t;;) {
- if (f == task)
+ if ((s = task.status) < 0) // validate
+ break outer;
+ if (q.base == b) {
+ int nb = b + 1, nk = nb & (cap - 1);
+ if (eligible) {
+ if (U.compareAndSetReference(
+ a, slotOffset(k), t, null)) {
+ q.updateBase(nb);
+ t.doExec();
+ locals = rescan = true;
+ break scan;
+ }
+ }
+ else if (a[k] == t) {
+ if (!rescan && t == null &&
+ (a[nk] != null || q.top - b > 0))
+ rescan = true; // revisit
break;
- else if ((f = f.completer) == null)
- break poll; // ineligible
- }
- if (WorkQueue.casSlotToNull(a, k, t)) {
- q.base = nb;
- w.source = src;
- t.doExec();
- w.source = wsrc;
- rescan = true;
- break scan;
+ }
}
}
- else
- break;
}
}
}
}
+ return s;
}
/**
- * Runs tasks until {@code isQuiescent()}. Rather than blocking
- * when tasks cannot be found, rescans until all others cannot
- * find tasks either.
+ * Runs tasks until all workers are inactive and no tasks are
+ * found. Rather than blocking when tasks cannot be found, rescans
+ * until all others cannot find tasks either.
*
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
* @param interruptible true if return on interrupt
* @return positive if quiescent, negative if interrupted, else 0
*/
private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
- long startTime = System.nanoTime(), parkTime = 0L;
- int phase; // w.phase set negative when temporarily quiescent
- if (w == null || (phase = w.phase) < 0)
+ int phase; // w.phase inactive bit set when temporarily quiescent
+ if (w == null || ((phase = w.phase) & IDLE) != 0)
return 0;
- int activePhase = phase, inactivePhase = phase | INACTIVE;
- int wsrc = w.source, r = 0;
- for (boolean locals = true;;) {
- WorkQueue[] qs; WorkQueue q;
- if (runState < 0) { // terminating
- w.phase = activePhase;
- return 1;
+ int wsrc = w.source;
+ long startTime = System.nanoTime();
+ long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP); // approx 1% nanos
+ long prevSum = 0L;
+ int activePhase = phase, inactivePhase = phase + IDLE;
+ int r = phase + 1, waits = 0, returnStatus = 1;
+ boolean locals = true;
+ for (int e = runState;;) {
+ if ((e & STOP) != 0)
+ break; // terminating
+ if (interruptible && Thread.interrupted()) {
+ returnStatus = -1;
+ break;
}
if (locals) { // run local tasks before (re)polling
+ locals = false;
for (ForkJoinTask<?> u; (u = w.nextLocalTask()) != null;)
u.doExec();
}
- boolean rescan = false, busy = locals = false, interrupted;
- int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
- scan: for (int i = n, j; i > 0; --i, ++r) {
- if ((q = qs[j = m & r]) != null && q != w) {
- for (int src = j | SRC;;) {
- ForkJoinTask<?>[] a = q.array;
- int b = q.base, cap;
- if (a == null || (cap = a.length) <= 0)
- break;
- int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
- ForkJoinTask<?> t = a[k];
- U.loadFence(); // for re-reads
- if (q.base != b || q.array != a || a[k] != t)
- ;
- else if (t == null) {
- if (!rescan) {
- if (a[nk] != null || q.top - b > 0)
- rescan = true;
- else if (!busy &&
- q.owner != null && q.phase >= 0)
- busy = true;
- }
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ long phaseSum = 0L;
+ boolean rescan = false, busy = false;
+ scan: for (int l = n; l > 0; --l, ++r) {
+ int j; WorkQueue q;
+ if ((q = qs[j = r & SMASK & (n - 1)]) != null && q != w) {
+ for (;;) {
+ ForkJoinTask<?>[] a; int b, cap, k;
+ if ((a = q.array) == null || (cap = a.length) <= 0)
break;
- }
- else if (phase < 0) // reactivate before taking
+ ForkJoinTask<?> t = a[k = (b = q.base) & (cap - 1)];
+ if (t != null && phase == inactivePhase) // reactivate
w.phase = phase = activePhase;
- else if (WorkQueue.casSlotToNull(a, k, t)) {
- q.base = nb;
- w.source = src;
- t.doExec();
- w.source = wsrc;
- rescan = locals = true;
- break scan;
+ U.loadFence();
+ if (q.base == b && a[k] == t) {
+ int nb = b + 1;
+ if (t == null) {
+ if (!rescan) {
+ int qp = q.phase, mq = qp & (IDLE | 1);
+ phaseSum += qp;
+ if (mq == 0 || q.top - b > 0)
+ rescan = true;
+ else if (mq == 1)
+ busy = true;
+ }
+ break;
+ }
+ if (U.compareAndSetReference(
+ a, slotOffset(k), t, null)) {
+ q.updateBase(nb);
+ w.source = j;
+ t.doExec();
+ w.source = wsrc;
+ rescan = locals = true;
+ break scan;
+ }
}
}
}
}
- if (rescan)
- ; // retry
- else if (phase >= 0) {
- parkTime = 0L;
+ if (e != (e = runState) || prevSum != (prevSum = phaseSum) ||
+ rescan || (e & RS_LOCK) != 0)
+ ; // inconsistent
+ else if (!busy)
+ break;
+ else if (phase == activePhase) {
+ waits = 0; // recheck, then sleep
w.phase = phase = inactivePhase;
}
- else if (!busy) {
- w.phase = activePhase;
- return 1;
- }
- else if (parkTime == 0L) {
- parkTime = 1L << 10; // initially about 1 usec
- Thread.yield();
- }
- else if ((interrupted = interruptible && Thread.interrupted()) ||
- System.nanoTime() - startTime > nanos) {
- w.phase = activePhase;
- return interrupted ? -1 : 0;
+ else if (System.nanoTime() - startTime > nanos) {
+ returnStatus = 0; // timed out
+ break;
}
+ else if (waits == 0) // same as spinLockRunState except
+ waits = MIN_SLEEP; // with rescan instead of onSpinWait
else {
- LockSupport.parkNanos(this, parkTime);
- if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
- parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
+ LockSupport.parkNanos(this, (long)waits);
+ if (waits < maxSleep)
+ waits <<= 1;
}
}
+ w.phase = activePhase;
+ return returnStatus;
}
/**
@@ -2254,28 +2510,31 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return positive if quiescent, negative if interrupted, else 0
*/
private int externalHelpQuiesce(long nanos, boolean interruptible) {
- for (long startTime = System.nanoTime(), parkTime = 0L;;) {
- ForkJoinTask<?> t;
- if ((t = pollScan(false)) != null) {
- t.doExec();
- parkTime = 0L;
- }
- else if (canStop())
- return 1;
- else if (parkTime == 0L) {
- parkTime = 1L << 10;
- Thread.yield();
- }
- else if ((System.nanoTime() - startTime) > nanos)
- return 0;
- else if (interruptible && Thread.interrupted())
- return -1;
- else {
- LockSupport.parkNanos(this, parkTime);
- if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
- parkTime <<= 1;
+ if (!quiescent()) {
+ long startTime = System.nanoTime();
+ long maxSleep = Math.min(nanos >>> 8, MAX_SLEEP);
+ for (int waits = 0;;) {
+ ForkJoinTask<?> t;
+ if (interruptible && Thread.interrupted())
+ return -1;
+ else if ((t = pollScan(false)) != null) {
+ waits = 0;
+ t.doExec();
+ }
+ else if (quiescent())
+ break;
+ else if (System.nanoTime() - startTime > nanos)
+ return 0;
+ else if (waits == 0)
+ waits = MIN_SLEEP;
+ else {
+ LockSupport.parkNanos(this, (long)waits);
+ if (waits < maxSleep)
+ waits <<= 1;
+ }
}
}
+ return 1;
}
/**
@@ -2316,59 +2575,69 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Finds and locks a WorkQueue for an external submitter, or
* throws RejectedExecutionException if shutdown or terminating.
+ * @param r current ThreadLocalRandom.getProbe() value
* @param isSubmit false if this is for a common pool fork
*/
- final WorkQueue submissionQueue(boolean isSubmit) {
- int r;
- ReentrantLock lock = registrationLock;
- if ((r = ThreadLocalRandom.getProbe()) == 0) {
+ private WorkQueue submissionQueue(int r) {
+ if (r == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
- if (lock != null) { // else init error
- for (int id = r << 1;;) { // even indices only
- int n, i; WorkQueue[] qs; WorkQueue q;
- if ((qs = queues) == null || (n = qs.length) <= 0)
- break;
- else if ((q = qs[i = (n - 1) & id]) == null) {
- WorkQueue w = new WorkQueue(null, id | SRC);
- w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
- lock.lock(); // install under lock
- if (queues == qs && qs[i] == null)
- qs[i] = w; // else lost race; discard
- lock.unlock();
- }
- else if (q.getAndSetAccess(1) != 0) // move and restart
- id = (r = ThreadLocalRandom.advanceProbe(r)) << 1;
- else if (isSubmit && runState != 0) {
- q.access = 0; // check while lock held
- break;
- }
- else
+ for (;;) {
+ int n, i, id; WorkQueue[] qs; WorkQueue q;
+ if ((qs = queues) == null)
+ break;
+ if ((n = qs.length) <= 0)
+ break;
+ if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
+ WorkQueue w = new WorkQueue(null, id, 0, false);
+ w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
+ int stop = lockRunState() & STOP;
+ if (stop == 0 && queues == qs && qs[i] == null)
+ q = qs[i] = w; // else discard; retry
+ unlockRunState();
+ if (q != null)
return q;
+ if (stop != 0)
+ break;
+ }
+ else if (!q.tryLockPhase()) // move index
+ r = ThreadLocalRandom.advanceProbe(r);
+ else if ((runState & SHUTDOWN) != 0) {
+ q.unlockPhase(); // check while q lock held
+ break;
}
+ else
+ return q;
}
+ tryTerminate(false, false);
throw new RejectedExecutionException();
}
- /**
- * Pushes a submission to the pool, using internal queue if called
- * from ForkJoinWorkerThread, else external queue.
- */
- private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty,
- ForkJoinTask<T> task) {
- WorkQueue q; Thread t; ForkJoinWorkerThread wt;
- U.storeStoreFence(); // ensure safely publishable
- if (task == null) throw new NullPointerException();
+ private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) {
+ Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
- (wt = (ForkJoinWorkerThread)t).pool == this)
+ (wt = (ForkJoinWorkerThread)t).pool == this) {
+ internal = true;
q = wt.workQueue;
- else {
- task.markPoolSubmission();
- q = submissionQueue(true);
}
- q.push(task, this, signalIfEmpty);
- return task;
+ else { // find and lock queue
+ internal = false;
+ q = submissionQueue(ThreadLocalRandom.getProbe());
+ }
+ q.push(task, signalIfEmpty ? this : null, internal);
+ }
+
+ /**
+ * Returns queue for an external submission, bypassing call to
+ * submissionQueue if already established and unlocked.
+ */
+ final WorkQueue externalSubmissionQueue() {
+ WorkQueue[] qs; WorkQueue q; int n;
+ int r = ThreadLocalRandom.getProbe();
+ return (((qs = queues) != null && (n = qs.length) > 0 &&
+ (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
+ q.tryLockPhase()) ? q : submissionQueue(r));
}
/**
@@ -2376,12 +2645,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* possibly ever submitted to the given pool (nonzero probe), or
* null if none.
*/
- private static WorkQueue externalQueue(ForkJoinPool p) {
- WorkQueue[] qs;
- int r = ThreadLocalRandom.getProbe(), n;
+ static WorkQueue externalQueue(ForkJoinPool p) {
+ WorkQueue[] qs; int n;
+ int r = ThreadLocalRandom.getProbe();
return (p != null && (qs = p.queues) != null &&
(n = qs.length) > 0 && r != 0) ?
- qs[(n - 1) & (r << 1)] : null;
+ qs[r & EXTERNAL_ID_MASK & (n - 1)] : null;
}
/**
@@ -2392,25 +2661,17 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Returns queue for an external thread, if one exists
- */
- final WorkQueue externalQueue() {
- return externalQueue(this);
- }
-
- /**
* If the given executor is a ForkJoinPool, poll and execute
* AsynchronousCompletionTasks from worker's queue until none are
* available or blocker is released.
*/
static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
WorkQueue w = null; Thread t; ForkJoinWorkerThread wt;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- if ((wt = (ForkJoinWorkerThread)t).pool == e)
- w = wt.workQueue;
- }
+ if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
+ (wt = (ForkJoinWorkerThread)t).pool == e)
+ w = wt.workQueue;
else if (e instanceof ForkJoinPool)
- w = ((ForkJoinPool)e).externalQueue();
+ w = externalQueue((ForkJoinPool)e);
if (w != null)
w.helpAsyncBlocker(blocker);
}
@@ -2482,59 +2743,83 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
* @param enable if true, terminate when next possible
- * @return true if terminating or terminated
- */
- private boolean tryTerminate(boolean now, boolean enable) {
- int rs; ReentrantLock lock; Condition cond;
- if ((rs = runState) >= 0) { // set SHUTDOWN and/or STOP
- if ((config & ISCOMMON) != 0)
- return false; // cannot shutdown
- if (!now) {
- if ((rs & SHUTDOWN) == 0) {
- if (!enable)
- return false;
- getAndBitwiseOrRunState(SHUTDOWN);
- }
- if (!canStop())
- return false;
+ * @return runState on exit
+ */
+ private int tryTerminate(boolean now, boolean enable) {
+ int e = runState;
+ if ((e & STOP) == 0) {
+ if (now) {
+ int s = lockRunState();
+ runState = e = (s + RS_LOCK) | STOP | SHUTDOWN;
+ if ((s & STOP) == 0)
+ interruptAll();
+ }
+ else {
+ int isShutdown = (e & SHUTDOWN);
+ if (isShutdown == 0 && enable)
+ getAndBitwiseOrRunState(isShutdown = SHUTDOWN);
+ if (isShutdown != 0)
+ quiescent(); // may trigger STOP
+ e = runState;
}
- getAndBitwiseOrRunState(SHUTDOWN | STOP);
- }
- WorkQueue released = reactivate(); // try signalling waiter
- int tc = (short)(ctl >>> TC_SHIFT);
- if (released == null && tc > 0) { // help unblock and cancel
- Thread current = Thread.currentThread();
- WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)current).workQueue : null);
- int r = (w == null) ? 0 : w.config + 1; // stagger traversals
+ }
+ if ((e & (STOP | TERMINATED)) == STOP) { // help cancel tasks
+ int r = (int)Thread.currentThread().threadId(); // stagger traversals
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) {
- WorkQueue q; Thread thread;
- if ((q = qs[(r + i) & (n - 1)]) != null &&
- (thread = q.owner) != current && q.access != STOP) {
- for (ForkJoinTask<?> t; (t = q.poll(null)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- if (thread != null && !thread.isInterrupted()) {
- q.forcePhaseActive(); // for awaitWork
- try {
- thread.interrupt();
- } catch (Throwable ignore) {
- }
+ for (int l = n; l > 0; --l, ++r) {
+ int j = r & SMASK & (n - 1); WorkQueue q; ForkJoinTask<?> t;
+ while ((q = qs[j]) != null && q.source != DEREGISTERED &&
+ (t = q.poll(null)) != null) {
+ try {
+ t.cancel(false);
+ } catch (Throwable ignore) {
}
}
}
+ if (((e = runState) & TERMINATED) == 0 && ctl == 0L) {
+ if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0) {
+ CountDownLatch done; SharedThreadContainer ctr;
+ if ((done = termination) != null)
+ done.countDown();
+ if ((ctr = container) != null)
+ ctr.close();
+ }
+ e = runState;
+ }
}
- if ((tc <= 0 || (short)(ctl >>> TC_SHIFT) <= 0) &&
- (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
- (lock = registrationLock) != null) {
- lock.lock(); // signal when no workers
- if ((cond = termination) != null)
- cond.signalAll();
- lock.unlock();
- container.close();
+ return e;
+ }
+
+ /**
+ * Interrupts all workers
+ */
+ private void interruptAll() {
+ Thread current = Thread.currentThread();
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ for (int i = 1; i < n; i += 2) {
+ WorkQueue q; Thread o;
+ if ((q = qs[i]) != null && (o = q.owner) != null && o != current &&
+ q.source != DEREGISTERED) {
+ try {
+ o.interrupt();
+ } catch (Throwable ignore) {
+ }
+ }
}
- return true;
+ }
+
+
+ /**
+ * Returns termination signal, constructing if necessary
+ */
+ private CountDownLatch terminationSignal() {
+ CountDownLatch signal, s, u;
+ if ((signal = termination) == null)
+ signal = ((u = cmpExTerminationSignal(
+ s = new CountDownLatch(1))) == null) ? s : u;
+ return signal;
}
// Exported methods
@@ -2708,19 +2993,17 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new IllegalArgumentException();
if (factory == null || unit == null)
throw new NullPointerException();
+ int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
this.parallelism = p;
this.factory = factory;
this.ueh = handler;
this.saturate = saturate;
- this.config = asyncMode ? FIFO : 0;
this.keepAlive = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
- int corep = Math.clamp(corePoolSize, p, MAX_CAP);
int maxSpares = Math.clamp(maximumPoolSize - p, 0, MAX_CAP);
int minAvail = Math.clamp(minimumRunnable, 0, MAX_CAP);
- this.bounds = (long)(minAvail & SMASK) | (long)(maxSpares << SWIDTH) |
- ((long)corep << 32);
- int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
- this.registrationLock = new ReentrantLock();
+ this.config = (((asyncMode ? FIFO : 0) & LMASK) |
+ (((long)maxSpares) << TC_SHIFT) |
+ (((long)minAvail) << RC_SHIFT));
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
String name = "ForkJoinPool-" + pid;
@@ -2768,14 +3051,13 @@ public class ForkJoinPool extends AbstractExecutorService {
int p = Math.min(pc, MAX_CAP);
int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
this.parallelism = p;
- this.config = ISCOMMON | preset;
- this.bounds = (long)(1 | (maxSpares << SWIDTH));
+ this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
+ (1L << RC_SHIFT));
this.factory = fac;
this.ueh = handler;
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
- this.registrationLock = new ReentrantLock();
this.queues = new WorkQueue[size];
this.container = SharedThreadContainer.create("ForkJoinPool.commonPool");
}
@@ -2818,6 +3100,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
+ Objects.requireNonNull(task);
poolSubmit(true, task);
return task.join();
}
@@ -2831,6 +3114,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
+ Objects.requireNonNull(task);
poolSubmit(true, task);
}
@@ -2864,7 +3148,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
- return poolSubmit(true, task);
+ Objects.requireNonNull(task);
+ poolSubmit(true, task);
+ return task;
}
/**
@@ -2874,7 +3160,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
- return poolSubmit(true, new ForkJoinTask.AdaptedCallable<T>(task));
+ ForkJoinTask<T> t =
+ (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+ new ForkJoinTask.AdaptedCallable<T>(task) :
+ new ForkJoinTask.AdaptedInterruptibleCallable<T>(task);
+ poolSubmit(true, t);
+ return t;
}
/**
@@ -2884,7 +3175,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
- return poolSubmit(true, new ForkJoinTask.AdaptedRunnable<T>(task, result));
+ ForkJoinTask<T> t =
+ (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+ new ForkJoinTask.AdaptedRunnable<T>(task, result) :
+ new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result);
+ poolSubmit(true, t);
+ return t;
}
/**
@@ -2895,13 +3191,15 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
@SuppressWarnings("unchecked")
public ForkJoinTask<?> submit(Runnable task) {
- return poolSubmit(true, (task instanceof ForkJoinTask<?>)
- ? (ForkJoinTask<Void>) task // avoid re-wrap
- : new ForkJoinTask.AdaptedRunnableAction(task));
+ ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ?
+ (ForkJoinTask<Void>) task : // avoid re-wrap
+ ((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+ new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
+ new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null));
+ poolSubmit(true, f);
+ return f;
}
- // Added mainly for possible use in Loom
-
/**
* Submits the given task as if submitted from a non-{@code ForkJoinTask}
* client. The task is added to a scheduling queue for submissions to the
@@ -2920,10 +3218,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @since 20
*/
public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
- U.storeStoreFence(); // ensure safely publishable
- task.markPoolSubmission();
- WorkQueue q = submissionQueue(true);
- q.push(task, this, true);
+ Objects.requireNonNull(task);
+ externalSubmissionQueue().push(task, this, false);
return task;
}
@@ -2944,7 +3240,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* @since 19
*/
public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
- return poolSubmit(false, task);
+ Objects.requireNonNull(task);
+ poolSubmit(false, task);
+ return task;
}
/**
@@ -2980,16 +3278,34 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * @throws NullPointerException {@inheritDoc}
- * @throws RejectedExecutionException {@inheritDoc}
+ * Uninterrupible version of {@code invokeAll}. Executes the given
+ * tasks, returning a list of Futures holding their status and
+ * results when all complete, ignoring interrupts. {@link
+ * Future#isDone} is {@code true} for each element of the returned
+ * list. Note that a <em>completed</em> task could have
+ * terminated either normally or by throwing an exception. The
+ * results of this method are undefined if the given collection is
+ * modified while this operation is in progress.
+ *
+ * @apiNote This method supports usages that previously relied on an
+ * incompatible override of
+ * {@link ExecutorService#invokeAll(java.util.Collection)}.
+ *
+ * @param tasks the collection of tasks
+ * @param <T> the type of the values returned from the tasks
+ * @return a list of Futures representing the tasks, in the same
+ * sequential order as produced by the iterator for the
+ * given task list, each of which has completed
+ * @throws NullPointerException if tasks or any of its elements are {@code null}
+ * @throws RejectedExecutionException if any task cannot be
+ * scheduled for execution
+ * @since 22
*/
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+ public <T> List<Future<T>> invokeAllUninterruptibly(Collection<? extends Callable<T>> tasks) {
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
- ForkJoinTask<T> f =
- new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
+ ForkJoinTask<T> f = ForkJoinTask.adapt(t);
futures.add(f);
poolSubmit(true, f);
}
@@ -2998,139 +3314,66 @@ public class ForkJoinPool extends AbstractExecutorService {
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
- ForkJoinTask.cancelIgnoringExceptions(e);
+ e.cancel(true);
throw t;
}
}
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
+ /**
+ * Common support for timed and untimed invokeAll
+ */
+ private <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+ long deadline)
throws InterruptedException {
- long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
- ForkJoinTask<T> f =
- new ForkJoinTask.AdaptedInterruptibleCallable<T>(t);
+ ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t);
futures.add(f);
poolSubmit(true, f);
}
- long startTime = System.nanoTime(), ns = nanos;
- boolean timedOut = (ns < 0L);
- for (int i = futures.size() - 1; i >= 0; --i) {
- ForkJoinTask<T> f = (ForkJoinTask<T>)futures.get(i);
- if (!f.isDone()) {
- if (!timedOut)
- timedOut = !f.quietlyJoin(ns, TimeUnit.NANOSECONDS);
- if (timedOut)
- ForkJoinTask.cancelIgnoringExceptions(f);
- else
- ns = nanos - (System.nanoTime() - startTime);
- }
- }
+ for (int i = futures.size() - 1; i >= 0; --i)
+ ((ForkJoinTask<?>)futures.get(i))
+ .quietlyJoinPoolInvokeAllTask(deadline);
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
- ForkJoinTask.cancelIgnoringExceptions(e);
+ e.cancel(true);
throw t;
}
}
- // Task to hold results from InvokeAnyTasks
- static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
- private static final long serialVersionUID = 2838392045355241008L;
- @SuppressWarnings("serial") // Conditionally serializable
- volatile E result;
- final AtomicInteger count; // in case all throw
- @SuppressWarnings("serial")
- final ForkJoinPool pool; // to check shutdown while collecting
- InvokeAnyRoot(int n, ForkJoinPool p) {
- pool = p;
- count = new AtomicInteger(n);
- }
- final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
- Throwable ex = null;
- boolean failed;
- if (c == null || Thread.interrupted() ||
- (pool != null && pool.runState < 0))
- failed = true;
- else if (isDone())
- failed = false;
- else {
- try {
- complete(c.call());
- failed = false;
- } catch (Throwable tx) {
- ex = tx;
- failed = true;
- }
- }
- if ((pool != null && pool.runState < 0) ||
- (failed && count.getAndDecrement() <= 1))
- trySetThrown(ex != null ? ex : new CancellationException());
- }
- public final boolean exec() { return false; } // never forked
- public final E getRawResult() { return result; }
- public final void setRawResult(E v) { result = v; }
- }
-
- // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
- static final class InvokeAnyTask<E> extends ForkJoinTask<E> {
- private static final long serialVersionUID = 2838392045355241008L;
- final InvokeAnyRoot<E> root;
- @SuppressWarnings("serial") // Conditionally serializable
- final Callable<E> callable;
- transient volatile Thread runner;
- InvokeAnyTask(InvokeAnyRoot<E> root, Callable<E> callable) {
- this.root = root;
- this.callable = callable;
- }
- public final boolean exec() {
- Thread.interrupted();
- runner = Thread.currentThread();
- root.tryComplete(callable);
- runner = null;
- Thread.interrupted();
- return true;
- }
- public final boolean cancel(boolean mayInterruptIfRunning) {
- Thread t;
- boolean stat = super.cancel(false);
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
- }
- }
- return stat;
- }
- public final void setRawResult(E v) {} // unused
- public final E getRawResult() { return null; }
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return invokeAll(tasks, 0L);
+ }
+ // for jdk version < 22, replace with
+ // /**
+ // * @throws NullPointerException {@inheritDoc}
+ // * @throws RejectedExecutionException {@inheritDoc}
+ // */
+ // @Override
+ // public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+ // return invokeAllUninterruptibly(tasks);
+ // }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+ long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return invokeAll(tasks, (System.nanoTime() + unit.toNanos(timeout)) | 1L);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
- int n = tasks.size();
- if (n <= 0)
- throw new IllegalArgumentException();
- InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
- ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
try {
- for (Callable<T> c : tasks) {
- if (c == null)
- throw new NullPointerException();
- InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
- fs.add(f);
- poolSubmit(true, f);
- if (root.isDone())
- break;
- }
- return root.get();
- } finally {
- for (InvokeAnyTask<T> f : fs)
- ForkJoinTask.cancelIgnoringExceptions(f);
+ return new ForkJoinTask.InvokeAnyRoot<T>()
+ .invokeAny(tasks, this, false, 0L);
+ } catch (TimeoutException cannotHappen) {
+ assert false;
+ return null;
}
}
@@ -3138,27 +3381,8 @@ public class ForkJoinPool extends AbstractExecutorService {
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- long nanos = unit.toNanos(timeout);
- int n = tasks.size();
- if (n <= 0)
- throw new IllegalArgumentException();
- InvokeAnyRoot<T> root = new InvokeAnyRoot<T>(n, this);
- ArrayList<InvokeAnyTask<T>> fs = new ArrayList<>(n);
- try {
- for (Callable<T> c : tasks) {
- if (c == null)
- throw new NullPointerException();
- InvokeAnyTask<T> f = new InvokeAnyTask<T>(root, c);
- fs.add(f);
- poolSubmit(true, f);
- if (root.isDone())
- break;
- }
- return root.get(nanos, TimeUnit.NANOSECONDS);
- } finally {
- for (InvokeAnyTask<T> f : fs)
- ForkJoinTask.cancelIgnoringExceptions(f);
- }
+ return new ForkJoinTask.InvokeAnyRoot<T>()
+ .invokeAny(tasks, this, true, unit.toNanos(timeout));
}
/**
@@ -3264,7 +3488,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return canStop();
+ return quiescent();
}
/**
@@ -3339,7 +3563,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if there are any queued submissions
*/
public boolean hasQueuedSubmissions() {
- return hasTasks(true);
+ WorkQueue[] qs; WorkQueue q;
+ if ((runState & STOP) == 0 && (qs = queues) != null) {
+ for (int i = 0; i < qs.length; i += 2) {
+ if ((q = qs[i]) != null && q.queueSize() > 0)
+ return true;
+ }
+ }
+ return false;
}
/**
@@ -3388,6 +3619,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public String toString() {
// Use a single pass through queues to collect counts
+ int e = runState;
long st = stealCount;
long qt = 0L, ss = 0L; int rc = 0;
WorkQueue[] qs; WorkQueue q;
@@ -3413,10 +3645,9 @@ public class ForkJoinPool extends AbstractExecutorService {
int ac = (short)(c >>> RC_SHIFT);
if (ac < 0) // ignore transient negative
ac = 0;
- int rs = runState;
- String level = ((rs & TERMINATED) != 0 ? "Terminated" :
- (rs & STOP) != 0 ? "Terminating" :
- (rs & SHUTDOWN) != 0 ? "Shutting down" :
+ String level = ((e & TERMINATED) != 0 ? "Terminated" :
+ (e & STOP) != 0 ? "Terminating" :
+ (e & SHUTDOWN) != 0 ? "Shutting down" :
"Running");
return super.toString() +
"[" + level +
@@ -3446,7 +3677,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public void shutdown() {
checkPermission();
- tryTerminate(false, true);
+ if (workerNamePrefix != null) // not common pool
+ tryTerminate(false, true);
}
/**
@@ -3469,7 +3701,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public List<Runnable> shutdownNow() {
checkPermission();
- tryTerminate(true, true);
+ if (workerNamePrefix != null) // not common pool
+ tryTerminate(true, true);
return Collections.emptyList();
}
@@ -3479,7 +3712,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- return (runState & TERMINATED) != 0;
+ return (tryTerminate(false, false) & TERMINATED) != 0;
}
/**
@@ -3496,7 +3729,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- return (runState & (STOP | TERMINATED)) == STOP;
+ return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP;
}
/**
@@ -3505,7 +3738,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return runState != 0;
+ return (runState & SHUTDOWN) != 0;
}
/**
@@ -3524,30 +3757,19 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
- ReentrantLock lock; Condition cond; boolean terminated;
long nanos = unit.toNanos(timeout);
- if ((config & ISCOMMON) != 0) {
+ CountDownLatch done;
+ if (workerNamePrefix == null) { // is common pool
if (helpQuiescePool(this, nanos, true) < 0)
throw new InterruptedException();
- terminated = false;
- }
- else if (!(terminated = ((runState & TERMINATED) != 0))) {
- tryTerminate(false, false); // reduce transient blocking
- if ((lock = registrationLock) != null &&
- !(terminated = (((runState & TERMINATED) != 0)))) {
- lock.lock();
- try {
- if ((cond = termination) == null)
- termination = cond = lock.newCondition();
- while (!(terminated = ((runState & TERMINATED) != 0)) &&
- nanos > 0L)
- nanos = cond.awaitNanos(nanos);
- } finally {
- lock.unlock();
- }
- }
+ return false;
}
- return terminated;
+ else if ((tryTerminate(false, false) & TERMINATED) != 0 ||
+ (done = terminationSignal()) == null ||
+ (runState & TERMINATED) != 0)
+ return true;
+ else
+ return done.await(nanos, TimeUnit.NANOSECONDS);
}
/**
@@ -3591,25 +3813,24 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public void close() {
- if ((config & ISCOMMON) == 0) {
- boolean terminated = tryTerminate(false, false);
- if (!terminated) {
- shutdown();
- boolean interrupted = false;
- while (!terminated) {
+ if (workerNamePrefix != null) {
+ checkPermission();
+ CountDownLatch done = null;
+ boolean interrupted = false;
+ while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
+ if (done == null)
+ done = terminationSignal();
+ else {
try {
- terminated = awaitTermination(1L, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- if (!interrupted) {
- shutdownNow();
- interrupted = true;
- }
+ done.await();
+ break;
+ } catch (InterruptedException ex) {
+ interrupted = true;
}
}
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
@@ -3728,18 +3949,20 @@ public class ForkJoinPool extends AbstractExecutorService {
/** ManagedBlock for ForkJoinWorkerThreads */
private void compensatedBlock(ManagedBlocker blocker)
throws InterruptedException {
- if (blocker == null) throw new NullPointerException();
+ Objects.requireNonNull(blocker);
for (;;) {
int comp; boolean done;
long c = ctl;
if (blocker.isReleasable())
break;
- if ((comp = tryCompensate(c, false)) >= 0) {
- long post = (comp == 0) ? 0L : RC_UNIT;
+ if ((runState & STOP) != 0)
+ throw new InterruptedException();
+ if ((comp = tryCompensate(c)) >= 0) {
try {
done = blocker.block();
} finally {
- getAndAddCtl(post);
+ if (comp > 0)
+ getAndAddCtl(RC_UNIT);
}
if (done)
break;
@@ -3753,22 +3976,17 @@ public class ForkJoinPool extends AbstractExecutorService {
* blocking operation is done then endCompensatedBlock must be invoked
* with the value returned by this method to re-adjust the parallelism.
*/
- private long beginCompensatedBlock() {
- for (;;) {
- int comp;
- if ((comp = tryCompensate(ctl, false)) >= 0) {
- return (comp == 0) ? 0L : RC_UNIT;
- } else {
- Thread.onSpinWait();
- }
- }
+ final long beginCompensatedBlock() {
+ int c;
+ do {} while ((c = tryCompensate(ctl)) < 0);
+ return (c == 0) ? 0L : RC_UNIT;
}
/**
* Re-adjusts parallelism after a blocking operation completes.
*/
void endCompensatedBlock(long post) {
- if (post > 0) {
+ if (post > 0L) {
getAndAddCtl(post);
}
}
@@ -3776,22 +3994,22 @@ public class ForkJoinPool extends AbstractExecutorService {
/** ManagedBlock for external threads */
private static void unmanagedBlock(ManagedBlocker blocker)
throws InterruptedException {
- if (blocker == null) throw new NullPointerException();
+ Objects.requireNonNull(blocker);
do {} while (!blocker.isReleasable() && !blocker.block());
}
- // AbstractExecutorService.newTaskFor overrides rely on
- // undocumented fact that ForkJoinTask.adapt returns ForkJoinTasks
- // that also implement RunnableFuture.
-
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
+ return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+ new ForkJoinTask.AdaptedRunnable<T>(runnable, value) :
+ new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new ForkJoinTask.AdaptedCallable<T>(callable);
+ return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
+ new ForkJoinTask.AdaptedCallable<T>(callable) :
+ new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable);
}
static {
@@ -3806,8 +4024,15 @@ public class ForkJoinPool extends AbstractExecutorService {
}
CTL = U.objectFieldOffset(klass, "ctl");
RUNSTATE = U.objectFieldOffset(klass, "runState");
- PARALLELISM = U.objectFieldOffset(klass, "parallelism");
+ PARALLELISM = U.objectFieldOffset(klass, "parallelism");
THREADIDS = U.objectFieldOffset(klass, "threadIds");
+ TERMINATION = U.objectFieldOffset(klass, "termination");
+ Class<ForkJoinTask[]> aklass = ForkJoinTask[].class;
+ ABASE = U.arrayBaseOffset(aklass);
+ int scale = U.arrayIndexScale(aklass);
+ ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("array index scale not a power of two");
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index e0737cde89d..8706359cda2 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.RandomAccess;
import java.util.concurrent.locks.LockSupport;
import jdk.internal.misc.Unsafe;
@@ -208,20 +209,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* See the internal documentation of class ForkJoinPool for a
* general implementation overview. ForkJoinTasks are mainly
* responsible for maintaining their "status" field amidst relays
- * to methods in ForkJoinWorkerThread and ForkJoinPool.
+ * to methods in ForkJoinWorkerThread and ForkJoinPool, along with
+ * recording and reporting exceptions. The status field mainly
+ * holds bits recording completion status. Note that there is no
+ * status bit representing "running", recording whether incomplete
+ * tasks are queued vs executing. However these cases can be
+ * distinguished in subclasses of InterruptibleTask that adds this
+ * capability by recording the running thread. Cancellation is
+ * recorded in status bits (ABNORMAL but not THROWN), but reported
+ * in joining methods by throwing an exception. Other exceptions
+ * of completed (THROWN) tasks are recorded in the "aux" field,
+ * but are reconstructed (in getException) to produce more useful
+ * stack traces when reported. Sentinels for interruptions or
+ * timeouts while waiting for completion are not recorded as
+ * status bits but are included in return values of methods in
+ * which they occur.
*
* The methods of this class are more-or-less layered into
* (1) basic status maintenance
* (2) execution and awaiting completion
* (3) user-level methods that additionally report results.
+ * (4) Subclasses for adaptors and internal usages
* This is sometimes hard to see because this file orders exported
* methods in a way that flows well in javadocs.
- *
- * Revision notes: This class uses jdk-internal Unsafe for atomics
- * and special memory modes, rather than VarHandles, to avoid
- * initialization dependencies in other jdk components that
- * require early parallelism. It also simplifies handling of
- * pool-submitted tasks, among other minor improvements.
*/
/**
@@ -231,9 +241,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* waiters. Cancelled waiters try to unsplice.
*/
static final class Aux {
- final Thread thread;
- final Throwable ex; // null if a waiter
- Aux next; // accessed only via memory-acquire chains
+ Thread thread; // thrower or waiter
+ final Throwable ex;
+ Aux next; // accessed only via memory-acquire chains
Aux(Thread thread, Throwable ex) {
this.thread = thread;
this.ex = ex;
@@ -259,17 +269,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* control bits occupy only (some of) the upper half (16 bits) of
* status field. The lower bits are used for user-defined tags.
*/
- static final int DONE = 1 << 31; // must be negative
- static final int ABNORMAL = 1 << 16;
- static final int THROWN = 1 << 17;
- static final int SMASK = 0xffff; // short bits for tags
- static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
- static final int POOLSUBMIT = 1 << 18; // for pool.submit vs fork
-
- // flags for awaitDone (in addition to above)
- static final int RAN = 1;
- static final int INTERRUPTIBLE = 2;
- static final int TIMED = 4;
+ static final int DONE = 1 << 31; // must be negative
+ static final int ABNORMAL = 1 << 16;
+ static final int THROWN = 1 << 17;
+ static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;
+ static final int MARKER = 1 << 30; // utility marker
+ static final int SMASK = 0xffff; // short bits for tags
+ static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel
// Fields
volatile int status; // accessed directly by pool and workers
@@ -285,25 +291,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private boolean casStatus(int c, int v) {
return U.compareAndSetInt(this, STATUS, c, v);
}
+
+ // Support for waiting and signalling
+
private boolean casAux(Aux c, Aux v) {
return U.compareAndSetReference(this, AUX, c, v);
}
-
- /**
- * Marks this task as an external pool submission.
- */
- final void markPoolSubmission() {
- getAndBitwiseOrStatus(POOLSUBMIT);
+ private Aux compareAndExchangeAux(Aux c, Aux v) {
+ return (Aux)U.compareAndExchangeReference(this, AUX, c, v);
}
-
/** Removes and unparks waiters */
private void signalWaiters() {
- for (Aux a; (a = aux) != null && a.ex == null; ) {
- if (casAux(a, null)) { // detach entire list
- for (Thread t; a != null; a = a.next) {
- if ((t = a.thread) != Thread.currentThread() && t != null)
- LockSupport.unpark(t); // don't self-signal
- }
+ for (Aux a = aux;;) {
+ if (a == null || a.ex != null)
+ break;
+ if (a == (a = compareAndExchangeAux(a, null))) {
+ do { // detach entire list
+ LockSupport.unpark(a.thread);
+ } while ((a = a.next) != null);
break;
}
}
@@ -311,23 +316,27 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* Sets DONE status and wakes up threads waiting to join this task.
- * @return status on exit
*/
- private int setDone() {
- int s = getAndBitwiseOrStatus(DONE) | DONE;
+ private void setDone() {
+ getAndBitwiseOrStatus(DONE);
signalWaiters();
- return s;
}
/**
* Sets ABNORMAL DONE status unless already done, and wakes up threads
* waiting to join this task.
- * @return status on exit
+ * @return previous status
*/
- private int trySetCancelled() {
+ final int trySetCancelled() {
int s;
- do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
- signalWaiters();
+ for (;;) {
+ if ((s = status) < 0)
+ break;
+ if (casStatus(s, s | (DONE | ABNORMAL))) {
+ signalWaiters();
+ break;
+ }
+ }
return s;
}
@@ -337,175 +346,175 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* If losing a race with setDone or trySetCancelled, the exception
* may be recorded but not reported.
*
- * @return status on exit
+ * @return true if set
*/
- final int trySetThrown(Throwable ex) {
- Aux h = new Aux(Thread.currentThread(), ex), p = null;
- boolean installed = false;
+ final boolean trySetThrown(Throwable ex) {
int s;
- while ((s = status) >= 0) {
- Aux a;
- if (!installed && ((a = aux) == null || a.ex == null) &&
- (installed = casAux(a, h)))
- p = a; // list of waiters replaced by h
- if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
- break;
+ boolean set = false, installed = false;
+ if ((s = status) >= 0) {
+ Aux a, p = null, h = new Aux(Thread.currentThread(), ex);
+ do {
+ if (!installed && ((a = aux) == null || a.ex == null) &&
+ (installed = casAux(a, h)))
+ p = a; // list of waiters replaced by h
+ if (installed && (set = casStatus(s, s | HAVE_EXCEPTION)))
+ break;
+ } while ((s = status) >= 0);
+ for (; p != null; p = p.next)
+ LockSupport.unpark(p.thread);
}
- for (; p != null; p = p.next)
- LockSupport.unpark(p.thread);
- return s;
+ return set;
}
/**
- * Records exception unless already done. Overridable in subclasses.
- *
- * @return status on exit
+ * Overridable action on setting exception
*/
- int trySetException(Throwable ex) {
- return trySetThrown(ex);
+ void onAuxExceptionSet(Throwable ex) {
}
/**
- * Constructor for subclasses to call.
+ * Tries to set exception, if so invoking onAuxExceptionSet
*/
- public ForkJoinTask() {}
-
- static boolean isExceptionalStatus(int s) { // needed by subclasses
- return (s & THROWN) != 0;
+ final void trySetException(Throwable ex) {
+ if (trySetThrown(ex))
+ onAuxExceptionSet(ex);
}
- /**
- * Unless done, calls exec and records status if completed, but
- * doesn't wait for completion otherwise.
+ /*
+ * Waits for signal, interrupt, timeout, or pool termination.
*
- * @return status on exit from this method
- */
- final int doExec() {
- int s; boolean completed;
+ * @param pool if nonnull, the pool of ForkJoinWorkerThread caller
+ * @param compensation result from a helping method
+ * @param interruptible if wait is interruptible
+ * @param deadline if nonzero, timeout deadline
+ * @return ABNORMAL if interrupted, 0 on timeout, else status on exit
+ */
+ private int awaitDone(ForkJoinPool pool, int compensation,
+ boolean interruptible, long deadline) {
+ int s;
if ((s = status) >= 0) {
- try {
- completed = exec();
- } catch (Throwable rex) {
- s = trySetException(rex);
- completed = false;
- }
- if (completed)
- s = setDone();
- }
- return s;
- }
-
- /**
- * Helps and/or waits for completion from join, get, or invoke;
- * called from either internal or external threads.
- *
- * @param how flags for POOLSUBMIT, RAN, INTERRUPTIBLE, TIMED
- * @param deadline if timed, timeout deadline
- * @return ABNORMAL if interrupted, else status on exit
- */
- private int awaitDone(int how, long deadline) {
- int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool p;
- ForkJoinPool.WorkQueue q = null;
- boolean timed = (how & TIMED) != 0;
- boolean owned = false, uncompensate = false;
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- owned = true;
- q = (wt = (ForkJoinWorkerThread)t).workQueue;
- p = wt.pool;
- }
- else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0)
- q = p.externalQueue();
- if (q != null && p != null) { // try helping
- if (this instanceof CountedCompleter)
- s = p.helpComplete(this, q, owned, timed);
- else if ((how & RAN) != 0 ||
- (s = q.tryRemoveAndExec(this, owned)) >= 0)
- s = (owned) ? p.helpJoin(this, q, timed) : 0;
- if (s < 0)
- return s;
- if (s == UNCOMPENSATE)
- uncompensate = true;
- }
- Aux node = null;
- long ns = 0L;
- boolean interrupted = false, queued = false;
- for (;;) { // install node and await signal
- Aux a;
- if ((s = status) < 0)
- break;
- else if (node == null)
+ Aux node = null;
+ try { // spinwait if out of memory
node = new Aux(Thread.currentThread(), null);
- else if (!queued) {
- if (((a = aux) == null || a.ex == null) &&
- (queued = casAux(node.next = a, node)))
- LockSupport.setCurrentBlocker(this);
+ } catch (OutOfMemoryError ex) {
}
- else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
- s = 0;
- break;
- }
- else if (Thread.interrupted()) {
- interrupted = true;
- if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
- cancelIgnoringExceptions(this); // cancel on shutdown
- else if ((how & INTERRUPTIBLE) != 0) {
- s = ABNORMAL;
+ boolean queued = false;
+ for (Aux a;;) { // try to install node
+ if ((s = status) < 0)
+ break;
+ else if (node == null)
+ Thread.onSpinWait();
+ else if (((a = aux) == null || a.ex == null) &&
+ (queued = casAux(node.next = a, node)))
break;
- }
}
- else if ((s = status) < 0) // recheck
- break;
- else if (timed)
- LockSupport.parkNanos(ns);
- else
- LockSupport.park();
- }
- if (uncompensate)
- p.uncompensate();
-
- if (queued) {
- LockSupport.setCurrentBlocker(null);
- if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
- outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
- for (Aux trail = null;;) {
+ if (queued) { // await signal or interrupt
+ LockSupport.setCurrentBlocker(this);
+ int interrupts = 0; // < 0 : throw; > 0 : re-interrupt
+ for (;;) {
+ if ((s = status) < 0)
+ break;
+ else if (interrupts < 0) {
+ s = ABNORMAL; // interrupted and not done
+ break;
+ }
+ else if (Thread.interrupted()) {
+ if (!ForkJoinPool.poolIsStopping(pool))
+ interrupts = interruptible ? -1 : 1;
+ else {
+ interrupts = 1; // re-assert if cleared
+ try {
+ cancel(true);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+ else if (deadline != 0L) {
+ long ns;
+ if ((ns = deadline - System.nanoTime()) <= 0) {
+ s = 0;
+ break;
+ }
+ LockSupport.parkNanos(ns);
+ }
+ else
+ LockSupport.park();
+ }
+ node.thread = null; // help clean aux; raciness OK
+ clean: for (Aux a;;) { // remove node if still present
+ if ((a = aux) == null || a.ex != null)
+ break;
+ for (Aux prev = null;;) {
Aux next = a.next;
if (a == node) {
- if (trail != null)
- trail.casNext(trail, next);
+ if (prev != null)
+ prev.casNext(prev, next);
else if (casAux(a, next))
- break outer; // cannot be re-encountered
- break; // restart
- } else {
- trail = a;
- if ((a = next) == null)
- break outer;
+ break clean;
+ break; // check for failed or stale CAS
}
+ prev = a;
+ if ((a = next) == null)
+ break clean; // not found
}
}
- }
- else {
- signalWaiters(); // help clean or signal
- if (interrupted)
+ LockSupport.setCurrentBlocker(null);
+ if (interrupts > 0)
Thread.currentThread().interrupt();
}
}
+ if (compensation == UNCOMPENSATE && pool != null)
+ pool.uncompensate();
return s;
}
/**
- * Cancels, ignoring any exceptions thrown by cancel. Cancel is
- * spec'ed not to throw any exceptions, but if it does anyway, we
- * have no recourse, so guard against this case.
+ * Tries applicable helping steps while joining this task,
+ * otherwise invokes blocking version of awaitDone. Called only
+ * when pre-checked not to be done, and pre-screened for
+ * interrupts and timeouts, if applicable.
+ *
+ * @param interruptible if wait is interruptible
+ * @param deadline if nonzero, timeout deadline
+ * @return ABNORMAL if interrupted, else status on exit
*/
- static final void cancelIgnoringExceptions(Future<?> t) {
- if (t != null) {
+ private int awaitDone(boolean interruptible, long deadline) {
+ ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;
+ Thread t; boolean internal; int s;
+ if (internal =
+ (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ p = (wt = (ForkJoinWorkerThread)t).pool;
+ q = wt.workQueue;
+ }
+ else
+ q = ForkJoinPool.externalQueue(p = ForkJoinPool.common);
+ return (((s = (p == null) ? 0 :
+ ((this instanceof CountedCompleter) ?
+ p.helpComplete(this, q, internal) :
+ (this instanceof InterruptibleTask) && !internal ? status :
+ p.helpJoin(this, q, internal))) < 0)) ? s :
+ awaitDone(internal ? p : null, s, interruptible, deadline);
+ }
+
+ /**
+ * Runs a task body: Unless done, calls exec and records status if
+ * completed, but doesn't wait for completion otherwise.
+ */
+ final void doExec() {
+ if (status >= 0) {
+ boolean completed = false;
try {
- t.cancel(true);
- } catch (Throwable ignore) {
+ completed = exec();
+ } catch (Throwable rex) {
+ trySetException(rex);
}
+ if (completed)
+ setDone();
}
}
+ // Reporting Exceptions
+
/**
* Returns a rethrowable exception for this task, if available.
* To provide accurate stack traces, if the exception was not
@@ -518,13 +527,19 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* still correct, although it may contain a misleading stack
* trace.
*
+ * @param asExecutionException true if wrap as ExecutionException
* @return the exception, or null if none
*/
- private Throwable getThrowableException() {
- Throwable ex; Aux a;
- if ((a = aux) == null)
- ex = null;
- else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
+ private Throwable getException(boolean asExecutionException) {
+ int s; Throwable ex; Aux a;
+ if ((s = status) >= 0 || (s & ABNORMAL) == 0)
+ return null;
+ else if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) {
+ ex = new CancellationException();
+ if (!asExecutionException || !(this instanceof InterruptibleTask))
+ return ex; // else wrap below
+ }
+ else if (a.thread != Thread.currentThread()) {
try {
Constructor<?> noArgCtor = null, oneArgCtor = null;
for (Constructor<?> c : ex.getClass().getConstructors()) {
@@ -546,40 +561,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} catch (Exception ignore) {
}
}
- return ex;
- }
-
- /**
- * Returns exception associated with the given status, or null if none.
- */
- private Throwable getException(int s) {
- Throwable ex = null;
- if ((s & ABNORMAL) != 0 && (ex = getThrowableException()) == null)
- ex = new CancellationException();
- return ex;
- }
-
- /**
- * Throws exception associated with the given status, or
- * CancellationException if none recorded.
- */
- private void reportException(int s) {
- ForkJoinTask.<RuntimeException>uncheckedThrow(getThrowableException());
+ return (asExecutionException) ? new ExecutionException(ex) : ex;
}
/**
- * Throws exception for (timed or untimed) get, wrapping if
- * necessary in an ExecutionException.
+ * Throws thrown exception, or CancellationException if none
+ * recorded.
*/
- private void reportExecutionException(int s) {
- Throwable ex = null, rx;
- if (s == ABNORMAL)
- ex = new InterruptedException();
- else if (s >= 0)
- ex = new TimeoutException();
- else if ((rx = getThrowableException()) != null)
- ex = new ExecutionException(rx);
- ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
+ private void reportException(boolean asExecutionException) {
+ ForkJoinTask.<RuntimeException>
+ uncheckedThrow(getException(asExecutionException));
}
/**
@@ -603,9 +594,30 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
throw (T)t; // rely on vacuous cast
}
+ // Utilities shared among ForkJoinTask, ForkJoinPool
+
+ /**
+ * Sets MARKER bit, returning nonzero if previously set
+ */
+ final int setForkJoinTaskStatusMarkerBit() {
+ return getAndBitwiseOrStatus(MARKER) & MARKER;
+ }
+
+ /**
+ * Returns nonzero if MARKER bit set.
+ */
+ final int getForkJoinTaskStatusMarkerBit() {
+ return status & MARKER;
+ }
+
// public methods
/**
+ * Constructor for subclasses to call.
+ */
+ public ForkJoinTask() {}
+
+ /**
* Arranges to asynchronously execute this task in the pool the
* current task is running in, if applicable, or using the {@link
* ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
@@ -622,15 +634,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final ForkJoinTask<V> fork() {
Thread t; ForkJoinWorkerThread wt;
- ForkJoinPool p; ForkJoinPool.WorkQueue q;
- U.storeStoreFence(); // ensure safely publishable
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- p = (wt = (ForkJoinWorkerThread)t).pool;
- q = wt.workQueue;
+ ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;
+ if (internal =
+ (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ q = (wt = (ForkJoinWorkerThread)t).workQueue;
+ p = wt.pool;
}
else
- q = (p = ForkJoinPool.common).submissionQueue(false);
- q.push(this, p, true);
+ q = (p = ForkJoinPool.common).externalSubmissionQueue();
+ q.push(this, p, internal);
return this;
}
@@ -647,10 +659,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final V join() {
int s;
- if ((s = status) >= 0)
- s = awaitDone(s & POOLSUBMIT, 0L);
- if ((s & ABNORMAL) != 0)
- reportException(s);
+ if ((((s = status) < 0 ? s : awaitDone(false, 0L)) & ABNORMAL) != 0)
+ reportException(false);
return getRawResult();
}
@@ -663,12 +673,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the computed result
*/
public final V invoke() {
- int s;
- if ((s = doExec()) >= 0)
- s = awaitDone(RAN, 0L);
- if ((s & ABNORMAL) != 0)
- reportException(s);
- return getRawResult();
+ doExec();
+ return join();
}
/**
@@ -693,18 +699,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (t1 == null || t2 == null)
throw new NullPointerException();
t2.fork();
- if ((s1 = t1.doExec()) >= 0)
- s1 = t1.awaitDone(RAN, 0L);
- if ((s1 & ABNORMAL) != 0) {
- cancelIgnoringExceptions(t2);
- t1.reportException(s1);
- }
- else {
- if ((s2 = t2.status) >= 0)
- s2 = t2.awaitDone(0, 0L);
- if ((s2 & ABNORMAL) != 0)
- t2.reportException(s2);
+ t1.doExec();
+ if ((((s1 = t1.status) < 0 ? s1 :
+ t1.awaitDone(false, 0L)) & ABNORMAL) != 0) {
+ t2.cancel(false);
+ t1.reportException(false);
}
+ else if ((((s2 = t2.status) < 0 ? s2 :
+ t2.awaitDone(false, 0L)) & ABNORMAL) != 0)
+ t2.reportException(false);
}
/**
@@ -726,36 +729,36 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
- ForkJoinTask<?> t;
+ ForkJoinTask<?> t; int s;
if ((t = tasks[i]) == null) {
ex = new NullPointerException();
break;
}
if (i == 0) {
- int s;
- if ((s = t.doExec()) >= 0)
- s = t.awaitDone(RAN, 0L);
- if ((s & ABNORMAL) != 0)
- ex = t.getException(s);
+ t.doExec();
+ if ((((s = t.status) < 0 ? s :
+ t.awaitDone(false, 0L)) & ABNORMAL) != 0)
+ ex = t.getException(false);
break;
}
t.fork();
}
if (ex == null) {
for (int i = 1; i <= last; ++i) {
- ForkJoinTask<?> t;
- if ((t = tasks[i]) != null) {
- int s;
- if ((s = t.status) >= 0)
- s = t.awaitDone(0, 0L);
- if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
- break;
- }
+ ForkJoinTask<?> t; int s;
+ if ((t = tasks[i]) != null &&
+ ((((s = t.status) < 0 ? s :
+ t.awaitDone(false, 0L)) & ABNORMAL) != 0) &&
+ (ex = t.getException(false)) != null)
+ break;
}
}
if (ex != null) {
- for (int i = 1; i <= last; ++i)
- cancelIgnoringExceptions(tasks[i]);
+ for (int i = 1; i <= last; ++i) {
+ ForkJoinTask<?> t;
+ if ((t = tasks[i]) != null)
+ t.cancel(false);
+ }
rethrow(ex);
}
}
@@ -789,36 +792,36 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Throwable ex = null;
int last = ts.size() - 1; // nearly same as array version
for (int i = last; i >= 0; --i) {
- ForkJoinTask<?> t;
+ ForkJoinTask<?> t; int s;
if ((t = ts.get(i)) == null) {
ex = new NullPointerException();
break;
}
if (i == 0) {
- int s;
- if ((s = t.doExec()) >= 0)
- s = t.awaitDone(RAN, 0L);
- if ((s & ABNORMAL) != 0)
- ex = t.getException(s);
+ t.doExec();
+ if ((((s = t.status) < 0 ? s :
+ t.awaitDone(false, 0L)) & ABNORMAL) != 0)
+ ex = t.getException(false);
break;
}
t.fork();
}
if (ex == null) {
for (int i = 1; i <= last; ++i) {
- ForkJoinTask<?> t;
- if ((t = ts.get(i)) != null) {
- int s;
- if ((s = t.status) >= 0)
- s = t.awaitDone(0, 0L);
- if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
- break;
- }
+ ForkJoinTask<?> t; int s;
+ if ((t = ts.get(i)) != null &&
+ ((((s = t.status) < 0 ? s :
+ t.awaitDone(false, 0L)) & ABNORMAL) != 0) &&
+ (ex = t.getException(false)) != null)
+ break;
}
}
if (ex != null) {
- for (int i = 1; i <= last; ++i)
- cancelIgnoringExceptions(ts.get(i));
+ for (int i = 1; i <= last; ++i) {
+ ForkJoinTask<?> t;
+ if ((t = ts.get(i)) != null)
+ t.cancel(false);
+ }
rethrow(ex);
}
return tasks;
@@ -852,7 +855,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
+ int s = trySetCancelled();
+ return (s >= 0 || (s & (ABNORMAL | THROWN)) == ABNORMAL);
}
public final boolean isDone() {
@@ -894,16 +898,25 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@Override
public V resultNow() {
- if (!isCompletedNormally())
- throw new IllegalStateException();
+ int s = status;
+ if ((s & DONE) == 0)
+ throw new IllegalStateException("Task has not completed");
+ if ((s & ABNORMAL) != 0) {
+ if ((s & THROWN) != 0)
+ throw new IllegalStateException("Task completed with exception");
+ else
+ throw new IllegalStateException("Task was cancelled");
+ }
return getRawResult();
}
@Override
public Throwable exceptionNow() {
- if ((status & (ABNORMAL | THROWN)) != (ABNORMAL | THROWN))
+ Throwable ex;
+ if ((status & HAVE_EXCEPTION) != HAVE_EXCEPTION ||
+ (ex = getException(false)) == null)
throw new IllegalStateException();
- return getThrowableException();
+ return ex;
}
/**
@@ -914,7 +927,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or {@code null} if none
*/
public final Throwable getException() {
- return getException(status);
+ return getException(false);
}
/**
@@ -984,13 +997,14 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* member of a ForkJoinPool and was interrupted while waiting
*/
public final V get() throws InterruptedException, ExecutionException {
- int s;
- if (Thread.interrupted())
- s = ABNORMAL;
- else if ((s = status) >= 0)
- s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE, 0L);
- if ((s & ABNORMAL) != 0)
- reportExecutionException(s);
+ int stat = status;
+ int s = ((stat < 0) ? stat :
+ (Thread.interrupted()) ? ABNORMAL :
+ awaitDone(true, 0L));
+ if (s == ABNORMAL)
+ throw new InterruptedException();
+ else if ((s & ABNORMAL) != 0)
+ reportException(true);
return getRawResult();
}
@@ -1011,14 +1025,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long nanos = unit.toNanos(timeout);
- int s;
- if (Thread.interrupted())
- s = ABNORMAL;
- else if ((s = status) >= 0 && nanos > 0L)
- s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
- nanos + System.nanoTime());
- if (s >= 0 || (s & ABNORMAL) != 0)
- reportExecutionException(s);
+ int stat = status;
+ int s = ((stat < 0) ? stat :
+ (Thread.interrupted()) ? ABNORMAL :
+ (nanos <= 0L) ? 0 :
+ awaitDone(true, (System.nanoTime() + nanos) | 1L));
+ if (s == ABNORMAL)
+ throw new InterruptedException();
+ else if (s >= 0)
+ throw new TimeoutException();
+ else if ((s & ABNORMAL) != 0)
+ reportException(true);
return getRawResult();
}
@@ -1029,9 +1046,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* known to have aborted.
*/
public final void quietlyJoin() {
- int s;
- if ((s = status) >= 0)
- awaitDone(s & POOLSUBMIT, 0L);
+ if (status >= 0)
+ awaitDone(false, 0L);
}
/**
@@ -1040,9 +1056,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* exception.
*/
public final void quietlyInvoke() {
- int s;
- if ((s = doExec()) >= 0)
- awaitDone(RAN, 0L);
+ doExec();
+ if (status >= 0)
+ awaitDone(false, 0L);
}
/**
@@ -1059,17 +1075,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final boolean quietlyJoin(long timeout, TimeUnit unit)
throws InterruptedException {
- int s;
long nanos = unit.toNanos(timeout);
- if (Thread.interrupted())
- s = ABNORMAL;
- else if ((s = status) >= 0 && nanos > 0L)
- s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
- nanos + System.nanoTime());
+ int stat = status;
+ int s = ((stat < 0) ? stat :
+ (Thread.interrupted()) ? ABNORMAL :
+ (nanos <= 0L) ? 0 :
+ awaitDone(true, (System.nanoTime() + nanos) | 1L));
if (s == ABNORMAL)
throw new InterruptedException();
- else
- return (s < 0);
+ return (s < 0);
}
/**
@@ -1086,11 +1100,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s;
long nanos = unit.toNanos(timeout);
if ((s = status) >= 0 && nanos > 0L)
- s = awaitDone((s & POOLSUBMIT) | TIMED, nanos + System.nanoTime());
+ s = awaitDone(false, (System.nanoTime() + nanos) | 1L);
return (s < 0);
}
/**
+ * Utility for possibly-timed ForkJoinPool.invokeAll
+ */
+ final void quietlyJoinPoolInvokeAllTask(long deadline)
+ throws InterruptedException {
+ int s;
+ if ((s = status) >= 0) {
+ if (Thread.interrupted())
+ s = ABNORMAL;
+ else if (deadline == 0L || deadline - System.nanoTime() > 0L)
+ s = awaitDone(true, deadline);
+ if (s == ABNORMAL)
+ throw new InterruptedException();
+ else if (s >= 0)
+ cancel(true);
+ }
+ }
+
+ /**
* Possibly executes tasks until the pool hosting the current task
* {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
* method may be of use in designs in which many tasks are forked,
@@ -1160,12 +1192,13 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- Thread t; ForkJoinPool.WorkQueue q; boolean owned;
- if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ Thread t; ForkJoinPool.WorkQueue q; boolean internal;
+ if (internal =
+ (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonQueue();
- return (q != null && q.tryUnpush(this, owned));
+ return (q != null && q.tryUnpush(this, internal));
}
/**
@@ -1361,6 +1394,147 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
+ // Factory methods for adaptors below
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code run}
+ * method of the given {@code Runnable} as its action, and returns
+ * a null result upon {@link #join}.
+ *
+ * @param runnable the runnable action
+ * @return the task
+ */
+ public static ForkJoinTask<?> adapt(Runnable runnable) {
+ return new AdaptedRunnableAction(runnable);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code run}
+ * method of the given {@code Runnable} as its action, and returns
+ * the given result upon {@link #join}.
+ *
+ * @param runnable the runnable action
+ * @param result the result upon completion
+ * @param <T> the type of the result
+ * @return the task
+ */
+ public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
+ return new AdaptedRunnable<T>(runnable, result);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code call}
+ * method of the given {@code Callable} as its action, and returns
+ * its result upon {@link #join}, translating any checked exceptions
+ * encountered into {@code RuntimeException}.
+ *
+ * @param callable the callable action
+ * @param <T> the type of the callable's result
+ * @return the task
+ */
+ public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
+ return new AdaptedCallable<T>(callable);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code call}
+ * method of the given {@code Callable} as its action, and returns
+ * its result upon {@link #join}, translating any checked exceptions
+ * encountered into {@code RuntimeException}. Additionally,
+ * invocations of {@code cancel} with {@code mayInterruptIfRunning
+ * true} will attempt to interrupt the thread performing the task.
+ *
+ * @param callable the callable action
+ * @param <T> the type of the callable's result
+ * @return the task
+ *
+ * @since 19
+ */
+ public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
+ return new AdaptedInterruptibleCallable<T>(callable);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code run}
+ * method of the given {@code Runnable} as its action, and returns
+ * the given result upon {@link #join}, translating any checked exceptions
+ * encountered into {@code RuntimeException}. Additionally,
+ * invocations of {@code cancel} with {@code mayInterruptIfRunning
+ * true} will attempt to interrupt the thread performing the task.
+ *
+ * @param runnable the runnable action
+ * @param result the result upon completion
+ * @param <T> the type of the result
+ * @return the task
+ *
+ * @since 22
+ */
+ public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result) {
+ return new AdaptedInterruptibleRunnable<T>(runnable, result);
+ }
+
+ /**
+ * Returns a new {@code ForkJoinTask} that performs the {@code
+ * run} method of the given {@code Runnable} as its action, and
+ * returns null upon {@link #join}, translating any checked
+ * exceptions encountered into {@code RuntimeException}.
+ * Additionally, invocations of {@code cancel} with {@code
+ * mayInterruptIfRunning true} will attempt to interrupt the
+ * thread performing the task.
+ *
+ * @param runnable the runnable action
+ * @return the task
+ *
+ * @since 22
+ */
+ public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {
+ return new AdaptedInterruptibleRunnable<Void>(runnable, null);
+ }
+
+ // Serialization support
+
+ private static final long serialVersionUID = -7721805057305804111L;
+
+ /**
+ * Saves this task to a stream (that is, serializes it).
+ *
+ * @param s the stream
+ * @throws java.io.IOException if an I/O error occurs
+ * @serialData the current run status and the exception thrown
+ * during execution, or {@code null} if none
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ Aux a;
+ s.defaultWriteObject();
+ s.writeObject((a = aux) == null ? null : a.ex);
+ }
+
+ /**
+ * Reconstitutes this task from a stream (that is, deserializes it).
+ * @param s the stream
+ * @throws ClassNotFoundException if the class of a serialized object
+ * could not be found
+ * @throws java.io.IOException if an I/O error occurs
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ Object ex = s.readObject();
+ if (ex != null)
+ aux = new Aux(Thread.currentThread(), (Throwable)ex);
+ }
+
+ static {
+ U = Unsafe.getUnsafe();
+ STATUS = U.objectFieldOffset(ForkJoinTask.class, "status");
+ AUX = U.objectFieldOffset(ForkJoinTask.class, "aux");
+ Class<?> dep1 = LockSupport.class; // ensure loaded
+ Class<?> dep2 = Aux.class;
+ }
+
+ // Special subclasses for adaptors and internal tasks
+
/**
* Adapter for Runnables. This implements RunnableFuture
* to be compliant with AbstractExecutorService constraints
@@ -1373,7 +1547,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedRunnable(Runnable runnable, T result) {
- if (runnable == null) throw new NullPointerException();
+ Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result; // OK to set this even before completion
}
@@ -1395,7 +1569,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
- if (runnable == null) throw new NullPointerException();
+ Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
@@ -1409,34 +1583,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
- * Adapter for Runnables in which failure forces worker exception.
- */
- static final class RunnableExecuteAction extends ForkJoinTask<Void> {
- @SuppressWarnings("serial") // Conditionally serializable
- final Runnable runnable;
- RunnableExecuteAction(Runnable runnable) {
- if (runnable == null) throw new NullPointerException();
- this.runnable = runnable;
- }
- public final Void getRawResult() { return null; }
- public final void setRawResult(Void v) { }
- public final boolean exec() { runnable.run(); return true; }
- int trySetException(Throwable ex) { // if a handler, invoke it
- int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h;
- if (isExceptionalStatus(s = trySetThrown(ex)) &&
- (h = ((t = Thread.currentThread()).
- getUncaughtExceptionHandler())) != null) {
- try {
- h.uncaughtException(t, ex);
- } catch (Throwable ignore) {
- }
- }
- return s;
- }
- private static final long serialVersionUID = 5232453952276885070L;
- }
-
- /**
* Adapter for Callables.
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
@@ -1446,7 +1592,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedCallable(Callable<? extends T> callable) {
- if (callable == null) throw new NullPointerException();
+ Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
@@ -1468,152 +1614,238 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static final long serialVersionUID = 2838392045355241008L;
}
- static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
+ /**
+ * Tasks with semantics conforming to ExecutorService conventions.
+ * Tasks are interruptible when cancelled, including cases of
+ * cancellation upon pool termination. In addition to recording
+ * the running thread to enable interrupt in cancel(true), the
+ * task checks for termination before executing the compute
+ * method, to cover shutdown races in which the task has not yet
+ * been cancelled on entry and might not otherwise be cancelled by
+ * others.
+ */
+ static abstract class InterruptibleTask<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
- @SuppressWarnings("serial") // Conditionally serializable
- final Callable<? extends T> callable;
transient volatile Thread runner;
- @SuppressWarnings("serial") // Conditionally serializable
- T result;
- AdaptedInterruptibleCallable(Callable<? extends T> callable) {
- if (callable == null) throw new NullPointerException();
- this.callable = callable;
- }
- public final T getRawResult() { return result; }
- public final void setRawResult(T v) { result = v; }
+ abstract T compute() throws Exception;
public final boolean exec() {
Thread.interrupted();
- runner = Thread.currentThread();
+ Thread t = runner = Thread.currentThread();
try {
- if (!isDone()) // recheck
- result = callable.call();
- return true;
- } catch (RuntimeException rex) {
- throw rex;
- } catch (Exception ex) {
- throw new RuntimeException(ex);
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ForkJoinPool.poolIsStopping(((ForkJoinWorkerThread)t).pool))
+ cancel(true);
+ else {
+ try {
+ if (status >= 0)
+ setRawResult(compute());
+ } catch (Exception ex) {
+ trySetException(ex);
+ }
+ }
} finally {
runner = null;
- Thread.interrupted();
}
+ return true;
}
- public final void run() { invoke(); }
- public final boolean cancel(boolean mayInterruptIfRunning) {
+ public boolean cancel(boolean mayInterruptIfRunning) {
Thread t;
- boolean stat = super.cancel(false);
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
+ if (trySetCancelled() >= 0) {
+ if (mayInterruptIfRunning && (t = runner) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
+ }
}
+ return true;
}
- return stat;
+ return isCancelled();
}
+ public final void run() { quietlyInvoke(); }
+ Object adaptee() { return null; } // for printing and diagnostics
public String toString() {
- return super.toString() + "[Wrapped task = " + callable + "]";
+ Object a = adaptee();
+ String s = super.toString();
+ return ((a == null) ? s :
+ (s + "[Wrapped task = " + a.toString() + "]"));
}
private static final long serialVersionUID = 2838392045355241008L;
}
/**
- * Returns a new {@code ForkJoinTask} that performs the {@code run}
- * method of the given {@code Runnable} as its action, and returns
- * a null result upon {@link #join}.
- *
- * @param runnable the runnable action
- * @return the task
+ * Adapter for Callable-based interruptible tasks.
*/
- public static ForkJoinTask<?> adapt(Runnable runnable) {
- return new AdaptedRunnableAction(runnable);
+ static final class AdaptedInterruptibleCallable<T> extends InterruptibleTask<T> {
+ @SuppressWarnings("serial") // Conditionally serializable
+ final Callable<? extends T> callable;
+ @SuppressWarnings("serial") // Conditionally serializable
+ T result;
+ AdaptedInterruptibleCallable(Callable<? extends T> callable) {
+ Objects.requireNonNull(callable);
+ this.callable = callable;
+ }
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ final T compute() throws Exception { return callable.call(); }
+ final Object adaptee() { return callable; }
+ private static final long serialVersionUID = 2838392045355241008L;
}
/**
- * Returns a new {@code ForkJoinTask} that performs the {@code run}
- * method of the given {@code Runnable} as its action, and returns
- * the given result upon {@link #join}.
- *
- * @param runnable the runnable action
- * @param result the result upon completion
- * @param <T> the type of the result
- * @return the task
+ * Adapter for Runnable-based interruptible tasks.
*/
- public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
- return new AdaptedRunnable<T>(runnable, result);
+ static final class AdaptedInterruptibleRunnable<T> extends InterruptibleTask<T> {
+ @SuppressWarnings("serial") // Conditionally serializable
+ final Runnable runnable;
+ @SuppressWarnings("serial") // Conditionally serializable
+ final T result;
+ AdaptedInterruptibleRunnable(Runnable runnable, T result) {
+ Objects.requireNonNull(runnable);
+ this.runnable = runnable;
+ this.result = result;
+ }
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { }
+ final T compute() { runnable.run(); return result; }
+ final Object adaptee() { return runnable; }
+ private static final long serialVersionUID = 2838392045355241008L;
}
/**
- * Returns a new {@code ForkJoinTask} that performs the {@code call}
- * method of the given {@code Callable} as its action, and returns
- * its result upon {@link #join}, translating any checked exceptions
- * encountered into {@code RuntimeException}.
- *
- * @param callable the callable action
- * @param <T> the type of the callable's result
- * @return the task
+ * Adapter for Runnables in which failure forces worker exception.
*/
- public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
- return new AdaptedCallable<T>(callable);
+ static final class RunnableExecuteAction extends InterruptibleTask<Void> {
+ @SuppressWarnings("serial") // Conditionally serializable
+ final Runnable runnable;
+ RunnableExecuteAction(Runnable runnable) {
+ Objects.requireNonNull(runnable);
+ this.runnable = runnable;
+ }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ final Void compute() { runnable.run(); return null; }
+ final Object adaptee() { return runnable; }
+ void onAuxExceptionSet(Throwable ex) { // if a handler, invoke it
+ Thread t; java.lang.Thread.UncaughtExceptionHandler h;
+ if ((h = ((t = Thread.currentThread()).
+ getUncaughtExceptionHandler())) != null) {
+ try {
+ h.uncaughtException(t, ex);
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
}
/**
- * Returns a new {@code ForkJoinTask} that performs the {@code call}
- * method of the given {@code Callable} as its action, and returns
- * its result upon {@link #join}, translating any checked exceptions
- * encountered into {@code RuntimeException}. Additionally,
- * invocations of {@code cancel} with {@code mayInterruptIfRunning
- * true} will attempt to interrupt the thread performing the task.
- *
- * @param callable the callable action
- * @param <T> the type of the callable's result
- * @return the task
- *
- * @since 19
- */
- public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
- // https://bugs.openjdk.org/browse/JDK-8246587
- return new AdaptedInterruptibleCallable<T>(callable);
- }
-
- // Serialization support
-
- private static final long serialVersionUID = -7721805057305804111L;
+ * Task (that is never forked) to hold results for
+ * ForkJoinPool.invokeAny, or to report exception if all subtasks
+ * fail or are cancelled or the pool is terminating. Both
+ * InvokeAnyRoot and InvokeAnyTask objects exist only transiently
+ * during invokeAny invocations, so serialization support would be
+ * nonsensical and is omitted.
+ */
+ @SuppressWarnings("serial")
+ static final class InvokeAnyRoot<T> extends InterruptibleTask<T> {
+ volatile T result;
+ volatile int count; // number of tasks; decremented in case all tasks fail
+ InvokeAnyRoot() { }
+ final void tryComplete(InvokeAnyTask<T> f, T v, Throwable ex,
+ boolean completed) {
+ if (f != null && !isDone()) {
+ if (ForkJoinPool.poolIsStopping(getPool()))
+ trySetCancelled();
+ else if (f.setForkJoinTaskStatusMarkerBit() == 0) {
+ if (completed) {
+ result = v;
+ quietlyComplete();
+ }
+ else if (U.getAndAddInt(this, COUNT, -1) <= 1) {
+ if (ex == null)
+ trySetCancelled();
+ else
+ trySetException(ex);
+ }
+ }
+ }
+ }
+ public final T compute() { return null; } // never forked
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { }
+
+ // Common support for timed and untimed versions of invokeAny
+ final T invokeAny(Collection<? extends Callable<T>> tasks,
+ ForkJoinPool pool, boolean timed, long nanos)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if ((count = tasks.size()) <= 0)
+ throw new IllegalArgumentException();
+ if (pool == null)
+ throw new NullPointerException();
+ InvokeAnyTask<T> t = null; // list of submitted tasks
+ try {
+ for (Callable<T> c : tasks)
+ pool.execute((ForkJoinTask<?>)
+ (t = new InvokeAnyTask<T>(c, this, t)));
+ return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
+ } finally {
+ for (; t != null; t = t.pred)
+ t.onRootCompletion();
+ }
+ }
- /**
- * Saves this task to a stream (that is, serializes it).
- *
- * @param s the stream
- * @throws java.io.IOException if an I/O error occurs
- * @serialData the current run status and the exception thrown
- * during execution, or {@code null} if none
- */
- private void writeObject(java.io.ObjectOutputStream s)
- throws java.io.IOException {
- Aux a;
- s.defaultWriteObject();
- s.writeObject((a = aux) == null ? null : a.ex);
+ private static final Unsafe U;
+ private static final long COUNT;
+ static {
+ U = Unsafe.getUnsafe();
+ COUNT = U.objectFieldOffset(InvokeAnyRoot.class, "count");
+ }
}
/**
- * Reconstitutes this task from a stream (that is, deserializes it).
- * @param s the stream
- * @throws ClassNotFoundException if the class of a serialized object
- * could not be found
- * @throws java.io.IOException if an I/O error occurs
+ * Task with results in InvokeAnyRoot (and never independently
+ * joined).
*/
- private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
- s.defaultReadObject();
- Object ex = s.readObject();
- if (ex != null)
- trySetThrown((Throwable)ex);
- }
-
- static {
- U = Unsafe.getUnsafe();
- STATUS = U.objectFieldOffset(ForkJoinTask.class, "status");
- AUX = U.objectFieldOffset(ForkJoinTask.class, "aux");
- Class<?> dep1 = LockSupport.class; // ensure loaded
- Class<?> dep2 = Aux.class;
+ @SuppressWarnings("serial")
+ static final class InvokeAnyTask<T> extends InterruptibleTask<Void> {
+ final Callable<? extends T> callable;
+ final InvokeAnyRoot<T> root;
+ final InvokeAnyTask<T> pred; // to traverse on cancellation
+ InvokeAnyTask(Callable<T> callable, InvokeAnyRoot<T> root,
+ InvokeAnyTask<T> pred) {
+ Objects.requireNonNull(callable);
+ this.callable = callable;
+ this.root = root;
+ this.pred = pred;
+ }
+ final Void compute() throws Exception {
+ InvokeAnyRoot<T> r = root;
+ T v = null; Throwable ex = null; boolean completed = false;
+ if (r != null && !r.isDone()) {
+ try {
+ v = callable.call();
+ completed = true;
+ } catch (Exception rex) {
+ ex = rex;
+ } finally {
+ r.tryComplete(this, v, ex, completed);
+ }
+ }
+ return null;
+ }
+ public final boolean cancel(boolean mayInterruptIfRunning) {
+ InvokeAnyRoot<T> r;
+ boolean stat = super.cancel(mayInterruptIfRunning);
+ if ((r = root) != null)
+ r.tryComplete(this, null, null, false);
+ return stat;
+ }
+ final void onRootCompletion() {
+ if (!isDone())
+ super.cancel(true); // no need for tryComplete
+ }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ final Object adaptee() { return callable; }
}
-
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index f7281998251..c4fc3de9750 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -76,9 +76,8 @@ public class ForkJoinWorkerThread extends Thread {
boolean clearThreadLocals) {
super(group, null, pool.nextWorkerThreadName(), 0L, !clearThreadLocals);
UncaughtExceptionHandler handler = (this.pool = pool).ueh;
- this.workQueue = new ForkJoinPool.WorkQueue(this, 0);
- if (clearThreadLocals)
- workQueue.setClearThreadLocals();
+ this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config,
+ clearThreadLocals);
super.setDaemon(true);
if (handler != null)
super.setUncaughtExceptionHandler(handler);
diff --git a/test/jdk/ProblemList.txt b/test/jdk/ProblemList.txt
index ec34e1f461b..8ab03ce62fb 100644
--- a/test/jdk/ProblemList.txt
+++ b/test/jdk/ProblemList.txt
@@ -723,8 +723,9 @@ com/sun/jdi/InvokeHangTest.java 8218463 linux-al
java/util/Locale/LocaleProvidersRun.java 8268379 macosx-x64
sun/util/locale/provider/CalendarDataRegression.java 8268379 macosx-x64
-java/util/concurrent/forkjoin/AsyncShutdownNow.java 8286352 linux-all,windows-x64
-java/util/concurrent/ExecutorService/CloseTest.java 8288899 macosx-aarch64
+java/util/concurrent/SynchronousQueue/Fairness.java 8300663 generic-x64
+java/util/Locale/LocaleProvidersRun.java 8268379 macosx-x64
+sun/util/locale/provider/CalendarDataRegression.java 8268379 macosx-x64
############################################################################
diff --git a/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java b/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java
index 193a4677354..162d4f7d1b4 100644
--- a/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java
+++ b/test/jdk/java/util/concurrent/ExecutorService/CloseTest.java
@@ -23,9 +23,9 @@
/*
* @test
- * @summary Test ExecutorService.close, including default implementation
+ * @summary Test implementations of ExecutorService.close
* @library ../lib
- * @run testng CloseTest
+ * @run junit CloseTest
*/
import java.time.Duration;
@@ -37,37 +37,32 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import static org.testng.Assert.*;
-
-public class CloseTest {
-
- @DataProvider(name = "executors")
- public Object[][] executors() {
- var defaultThreadFactory = Executors.defaultThreadFactory();
- var virtualThreadFactory = Thread.ofVirtual().factory();
- return new Object[][] {
- // ensures that default close method is tested
- { new DelegatingExecutorService(Executors.newCachedThreadPool()), },
-
- // implementations that may override close
- { new ForkJoinPool(), },
- { Executors.newFixedThreadPool(1), },
- { Executors.newCachedThreadPool(), },
- { Executors.newThreadPerTaskExecutor(defaultThreadFactory), },
- { Executors.newThreadPerTaskExecutor(virtualThreadFactory), },
- };
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.*;
+
+class CloseTest {
+
+ static Stream<ExecutorService> executors() {
+ return Stream.of(
+ // ensures that default close method is tested
+ new DelegatingExecutorService(Executors.newCachedThreadPool()),
+
+ // implementations that may override close
+ Executors.newCachedThreadPool(),
+ Executors.newVirtualThreadPerTaskExecutor(),
+ new ForkJoinPool()
+ );
}
/**
* Test close with no tasks running.
*/
- @Test(dataProvider = "executors")
- public void testCloseWithNoTasks(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCloseWithNoTasks(ExecutorService executor) throws Exception {
executor.close();
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
@@ -77,24 +72,109 @@ public class CloseTest {
/**
* Test close with tasks running.
*/
- @Test(dataProvider = "executors")
- public void testCloseWithRunningTasks(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCloseWithRunningTasks(ExecutorService executor) throws Exception {
+ Phaser phaser = new Phaser(2);
Future<?> future = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
Thread.sleep(Duration.ofMillis(100));
return "foo";
});
+ phaser.arriveAndAwaitAdvance(); // wait for task to start
+
executor.close(); // waits for task to complete
+ assertFalse(Thread.interrupted());
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
- assertEquals(future.resultNow(), "foo");
+ assertEquals("foo", future.resultNow());
+ }
+
+ /**
+ * Test shutdown with tasks running.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testShutdownWithRunningTasks(ExecutorService executor) throws Exception {
+ Phaser phaser = new Phaser(2);
+ Future<?> future = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(Duration.ofMillis(100));
+ return "foo";
+ });
+ phaser.arriveAndAwaitAdvance(); // wait for task to start
+
+ executor.shutdown();
+ assertFalse(Thread.interrupted());
+ assertTrue(executor.isShutdown());
+ assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+ assertTrue(executor.isTerminated());
+ assertEquals("foo", future.resultNow());
+ }
+
+ /**
+ * Test close with multiple tasks running
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCloseWith2RunningTasks(ExecutorService executor) throws Exception {
+ Phaser phaser = new Phaser(3);
+ Future<?> f1 = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(Duration.ofMillis(100));
+ return "foo";
+ });
+ Future<?> f2 = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(Duration.ofMillis(100));
+ return "bar";
+ });
+ phaser.arriveAndAwaitAdvance(); // wait for tasks to start
+
+ executor.close(); // waits for task to complete
+ assertFalse(Thread.interrupted());
+ assertTrue(executor.isShutdown());
+ assertTrue(executor.isTerminated());
+ assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
+ assertEquals("foo", f1.resultNow());
+ assertEquals("bar", f2.resultNow());
+ }
+
+ /**
+ * Test shutdown with multiple tasks running
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testShutdownWith2RunningTasks(ExecutorService executor) throws Exception {
+ Phaser phaser = new Phaser(3);
+ Future<?> f1 = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(Duration.ofMillis(100));
+ return "foo";
+ });
+ Future<?> f2 = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(Duration.ofMillis(100));
+ return "bar";
+ });
+ phaser.arriveAndAwaitAdvance(); // wait for tasks to start
+
+ executor.shutdown();
+ assertFalse(Thread.interrupted());
+ assertTrue(executor.isShutdown());
+ assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+ assertTrue(executor.isTerminated());
+ assertEquals("foo", f1.resultNow());
+ assertEquals("bar", f2.resultNow());
}
/**
* Test close when executor is shutdown but not terminated.
*/
- @Test(dataProvider = "executors")
- public void testShutdownBeforeClose(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testShutdownBeforeClose(ExecutorService executor) throws Exception {
Phaser phaser = new Phaser(2);
Future<?> future = executor.submit(() -> {
phaser.arriveAndAwaitAdvance();
@@ -104,22 +184,22 @@ public class CloseTest {
phaser.arriveAndAwaitAdvance(); // wait for task to start
executor.shutdown(); // shutdown, will not immediately terminate
-
executor.close();
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
- assertEquals(future.resultNow(), "foo");
+ Object s = future.resultNow();
+ assertEquals("foo", s);
}
/**
* Test close when terminated.
*/
- @Test(dataProvider = "executors")
- public void testTerminateBeforeClose(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testTerminateBeforeClose(ExecutorService executor) throws Exception {
executor.shutdown();
assertTrue(executor.isTerminated());
-
executor.close();
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
@@ -129,8 +209,9 @@ public class CloseTest {
/**
* Test invoking close with interrupt status set.
*/
- @Test(dataProvider = "executors")
- public void testInterruptBeforeClose(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInterruptBeforeClose(ExecutorService executor) throws Exception {
Phaser phaser = new Phaser(2);
Future<?> future = executor.submit(() -> {
phaser.arriveAndAwaitAdvance();
@@ -149,21 +230,29 @@ public class CloseTest {
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
- expectThrows(ExecutionException.class, future::get);
+ assertThrows(ExecutionException.class, future::get);
}
/**
* Test interrupting thread blocked in close.
*/
- @Test(dataProvider = "executors")
- public void testInterruptDuringClose(ExecutorService executor) throws Exception {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInterruptDuringClose(ExecutorService executor) throws Exception {
+ Phaser phaser = new Phaser(2);
Future<?> future = executor.submit(() -> {
+ phaser.arriveAndAwaitAdvance();
Thread.sleep(Duration.ofDays(1));
return null;
});
+ phaser.arriveAndAwaitAdvance(); // wait for task to start
+
+ // schedule main thread to be interrupted
Thread thread = Thread.currentThread();
new Thread(() -> {
- try { Thread.sleep( Duration.ofMillis(500)); } catch (Exception ignore) { }
+ try {
+ Thread.sleep( Duration.ofMillis(100));
+ } catch (Exception ignore) { }
thread.interrupt();
}).start();
try {
@@ -175,6 +264,6 @@ public class CloseTest {
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
assertTrue(executor.awaitTermination(10, TimeUnit.MILLISECONDS));
- expectThrows(ExecutionException.class, future::get);
+ assertThrows(ExecutionException.class, future::get);
}
}
diff --git a/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java b/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java
new file mode 100644
index 00000000000..a9a4eb0d8d7
--- /dev/null
+++ b/test/jdk/java/util/concurrent/ExecutorService/InvokeTest.java
@@ -0,0 +1,787 @@
+/*
+ * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/**
+ * @test
+ * @summary Test implementations of ExecutorService.invokeAll/invokeAny
+ * @run junit InvokeTest
+ */
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+import static java.lang.Thread.State.*;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.*;
+
+class InvokeTest {
+
+ private static ScheduledExecutorService scheduler;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @AfterAll
+ static void shutdown() {
+ scheduler.shutdown();
+ }
+
+ private static Stream<ExecutorService> executors() {
+ return Stream.of(
+ Executors.newCachedThreadPool(),
+ Executors.newVirtualThreadPerTaskExecutor(),
+ new ForkJoinPool()
+ );
+ }
+
+ /**
+ * Test invokeAny where all tasks complete normally.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny1(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> "bar";
+ String result = executor.invokeAny(List.of(task1, task2));
+ assertTrue(Set.of("foo", "bar").contains(result));
+ }
+ }
+
+ /**
+ * Test invokeAny where all tasks complete normally. The completion of the
+ * first task should cancel remaining tasks.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny2(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ String result = executor.invokeAny(List.of(task1, task2));
+ assertEquals("foo", result);
+
+ // if task2 started then it should have been interrupted
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test invokeAny where all tasks complete with exception.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny3(ExecutorService executor) throws Exception {
+ try (executor) {
+ class FooException extends Exception { }
+ Callable<String> task1 = () -> { throw new FooException(); };
+ Callable<String> task2 = () -> { throw new FooException(); };
+ try {
+ executor.invokeAny(List.of(task1, task2));
+ fail("invokeAny did not throw");
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue(cause instanceof FooException);
+ }
+ }
+ }
+
+ /**
+ * Test invokeAny where all tasks complete with exception. The completion
+ * of the last task is delayed.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny4(ExecutorService executor) throws Exception {
+ try (executor) {
+ class FooException extends Exception { }
+ Callable<String> task1 = () -> { throw new FooException(); };
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMillis(50));
+ throw new FooException();
+ };
+ try {
+ executor.invokeAny(List.of(task1, task2));
+ fail("invokeAny did not throw");
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ assertTrue(cause instanceof FooException);
+ }
+ }
+ }
+
+ /**
+ * Test invokeAny where some, not all, tasks complete normally.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny5(ExecutorService executor) throws Exception {
+ try (executor) {
+ class FooException extends Exception { }
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> { throw new FooException(); };
+ String result = executor.invokeAny(List.of(task1, task2));
+ assertEquals("foo", result);
+ }
+ }
+
+ /**
+ * Test invokeAny where some, not all, tasks complete normally. The first
+ * task to complete normally is delayed.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAny6(ExecutorService executor) throws Exception {
+ try (executor) {
+ class FooException extends Exception { }
+ Callable<String> task1 = () -> {
+ Thread.sleep(Duration.ofMillis(50));
+ return "foo";
+ };
+ Callable<String> task2 = () -> { throw new FooException(); };
+ String result = executor.invokeAny(List.of(task1, task2));
+ assertEquals("foo", result);
+ }
+ }
+
+ /**
+ * Test timed-invokeAny where all tasks complete normally before the timeout.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyWithTimeout1(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> "bar";
+ String result = executor.invokeAny(List.of(task1, task2), 1, TimeUnit.MINUTES);
+ assertTrue(Set.of("foo", "bar").contains(result));
+ }
+ }
+
+ /**
+ * Test timed-invokeAny where one task completes normally before the timeout.
+ * The remaining tests should be cancelled.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyWithTimeout2(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ String result = executor.invokeAny(List.of(task1, task2), 1, TimeUnit.MINUTES);
+ assertEquals("foo", result);
+
+ // if task2 started then it should have been interrupted
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test timed-invokeAny where timeout expires before any task completes.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyWithTimeout3(ExecutorService executor) throws Exception {
+ try (executor) {
+ var task1Started = new AtomicBoolean();
+ var task1Interrupted = new CountDownLatch(1);
+ Callable<String> task1 = () -> {
+ task1Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task1Interrupted.countDown();
+ }
+ return null;
+ };
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ // invokeAny should throw TimeoutException
+ assertThrows(TimeoutException.class,
+ () -> executor.invokeAny(List.of(task1, task2), 100, TimeUnit.MILLISECONDS));
+
+ // tasks that started should be interrupted
+ if (task1Started.get()) {
+ task1Interrupted.await();
+ }
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test invokeAny with interrupt status set.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyWithInterruptSet(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> {
+ Thread.sleep(Duration.ofMinutes(1));
+ return "foo";
+ };
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMinutes(1));
+ return "bar";
+ };
+ Thread.currentThread().interrupt();
+ try {
+ executor.invokeAny(List.of(task1, task2));
+ fail("invokeAny did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ }
+ }
+
+ /**
+ * Test interrupting a thread blocked in invokeAny.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInterruptInvokeAny(ExecutorService executor) throws Exception {
+ try (executor) {
+ var task1Started = new AtomicBoolean();
+ var task1Interrupted = new CountDownLatch(1);
+ Callable<String> task1 = () -> {
+ task1Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task1Interrupted.countDown();
+ }
+ return null;
+ };
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ scheduleInterruptAt("invokeAny");
+ try {
+ executor.invokeAny(List.of(task1, task2));
+ fail("invokeAny did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+
+ // tasks that started should be interrupted
+ if (task1Started.get()) {
+ task1Interrupted.await();
+ }
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test invokeAny after ExecutorService has been shutdown.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyAfterShutdown(ExecutorService executor) throws Exception {
+ executor.shutdown();
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> "bar";
+ assertThrows(RejectedExecutionException.class,
+ () -> executor.invokeAny(List.of(task1, task2)));
+ }
+
+ /**
+ * Test invokeAny with empty collection.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyEmpty1(ExecutorService executor) throws Exception {
+ try (executor) {
+ assertThrows(IllegalArgumentException.class, () -> executor.invokeAny(List.of()));
+ }
+ }
+
+ /**
+ * Test timed-invokeAny with empty collection.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyEmpty2(ExecutorService executor) throws Exception {
+ try (executor) {
+ assertThrows(IllegalArgumentException.class,
+ () -> executor.invokeAny(List.of(), 1, TimeUnit.MINUTES));
+ }
+ }
+
+ /**
+ * Test invokeAny with null.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyNull1(ExecutorService executor)throws Exception {
+ try (executor) {
+ assertThrows(NullPointerException.class, () -> executor.invokeAny(null));
+ }
+ }
+
+ /**
+ * Test invokeAny with null element
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAnyNull2(ExecutorService executor)throws Exception {
+ try (executor) {
+ List<Callable<String>> list = new ArrayList<>();
+ list.add(() -> "foo");
+ list.add(null);
+ assertThrows(NullPointerException.class, () -> executor.invokeAny(null));
+ }
+ }
+
+ /**
+ * Test invokeAll where all tasks complete normally.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAll1(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMillis(50));
+ return "bar";
+ };
+
+ List<Future<String>> futures = executor.invokeAll(List.of(task1, task2));
+ assertTrue(futures.size() == 2);
+
+ // check results
+ List<String> results = futures.stream().map(Future::resultNow).toList();
+ assertEquals(results, List.of("foo", "bar"));
+ }
+ }
+
+ /**
+ * Test invokeAll where all tasks complete with exception.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAll2(ExecutorService executor) throws Exception {
+ try (executor) {
+ class FooException extends Exception { }
+ class BarException extends Exception { }
+ Callable<String> task1 = () -> { throw new FooException(); };
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMillis(50));
+ throw new BarException();
+ };
+
+ List<Future<String>> futures = executor.invokeAll(List.of(task1, task2));
+ assertTrue(futures.size() == 2);
+
+ // check results
+ Throwable e1 = assertThrows(ExecutionException.class, () -> futures.get(0).get());
+ assertTrue(e1.getCause() instanceof FooException);
+ Throwable e2 = assertThrows(ExecutionException.class, () -> futures.get(1).get());
+ assertTrue(e2.getCause() instanceof BarException);
+ }
+ }
+
+ /**
+ * Test invokeAll where all tasks complete normally before the timeout expires.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAll3(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMillis(50));
+ return "bar";
+ };
+
+ List<Future<String>> futures = executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES);
+ assertTrue(futures.size() == 2);
+
+ // check results
+ List<String> results = futures.stream().map(Future::resultNow).toList();
+ assertEquals(results, List.of("foo", "bar"));
+ }
+ }
+
+ /**
+ * Test invokeAll where some tasks do not complete before the timeout expires.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAll4(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ List<Future<String>> futures = executor.invokeAll(List.of(task1, task2), 1, TimeUnit.SECONDS);
+ assertTrue(futures.size() == 2);
+
+ // task1 should be done
+ assertTrue(futures.get(0).isDone());
+
+ // task2 should be cancelled and interrupted
+ assertTrue(futures.get(1).isCancelled());
+
+ // if task2 started then it should have been interrupted
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test invokeAll with interrupt status set.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllInterrupt1(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMinutes(1));
+ return "bar";
+ };
+
+ Thread.currentThread().interrupt();
+ try {
+ executor.invokeAll(List.of(task1, task2));
+ fail("invokeAll did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ }
+ }
+
+ /**
+ * Test timed-invokeAll with interrupt status set.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllInterrupt3(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> {
+ Thread.sleep(Duration.ofMinutes(1));
+ return "bar";
+ };
+
+ Thread.currentThread().interrupt();
+ try {
+ executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES);
+ fail("invokeAll did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+ }
+ }
+
+ /**
+ * Test interrupt with thread blocked in invokeAll.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllInterrupt4(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ scheduleInterruptAt("invokeAll");
+ try {
+ executor.invokeAll(List.of(task1, task2));
+ fail("invokeAll did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+
+ // if task2 started then it should have been interrupted
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test interrupt with thread blocked in timed-invokeAll.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllInterrupt6(ExecutorService executor) throws Exception {
+ try (executor) {
+ Callable<String> task1 = () -> "foo";
+
+ var task2Started = new AtomicBoolean();
+ var task2Interrupted = new CountDownLatch(1);
+ Callable<String> task2 = () -> {
+ task2Started.set(true);
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ task2Interrupted.countDown();
+ }
+ return null;
+ };
+
+ scheduleInterruptAt("invokeAll");
+ try {
+ executor.invokeAll(List.of(task1, task2), 1, TimeUnit.MINUTES);
+ fail("invokeAll did not throw");
+ } catch (InterruptedException expected) {
+ assertFalse(Thread.currentThread().isInterrupted());
+ } finally {
+ Thread.interrupted(); // clear interrupt
+ }
+
+ // if task2 started then it should have been interrupted
+ if (task2Started.get()) {
+ task2Interrupted.await();
+ }
+ }
+ }
+
+ /**
+ * Test invokeAll after ExecutorService has been shutdown.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllAfterShutdown1(ExecutorService executor) throws Exception {
+ executor.shutdown();
+
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> "bar";
+ assertThrows(RejectedExecutionException.class,
+ () -> executor.invokeAll(List.of(task1, task2)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllAfterShutdown2(ExecutorService executor) throws Exception {
+ executor.shutdown();
+
+ Callable<String> task1 = () -> "foo";
+ Callable<String> task2 = () -> "bar";
+ assertThrows(RejectedExecutionException.class,
+ () -> executor.invokeAll(List.of(task1, task2), 1, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Test invokeAll with empty collection.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllEmpty1(ExecutorService executor) throws Exception {
+ try (executor) {
+ List<Future<Object>> list = executor.invokeAll(List.of());
+ assertTrue(list.size() == 0);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllEmpty2(ExecutorService executor) throws Exception {
+ try (executor) {
+ List<Future<Object>> list = executor.invokeAll(List.of(), 1, TimeUnit.SECONDS);
+ assertTrue(list.size() == 0);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllNull1(ExecutorService executor)throws Exception {
+ try (executor) {
+ assertThrows(NullPointerException.class, () -> executor.invokeAll(null));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllNull2(ExecutorService executor)throws Exception {
+ try (executor) {
+ List<Callable<String>> tasks = new ArrayList<>();
+ tasks.add(() -> "foo");
+ tasks.add(null);
+ assertThrows(NullPointerException.class, () -> executor.invokeAll(tasks));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllNull3(ExecutorService executor)throws Exception {
+ try (executor) {
+ assertThrows(NullPointerException.class,
+ () -> executor.invokeAll(null, 1, TimeUnit.SECONDS));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllNull4(ExecutorService executor)throws Exception {
+ try (executor) {
+ Callable<String> task = () -> "foo";
+ assertThrows(NullPointerException.class,
+ () -> executor.invokeAll(List.of(task), 1, null));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testInvokeAllNull5(ExecutorService executor)throws Exception {
+ try (executor) {
+ List<Callable<String>> tasks = new ArrayList<>();
+ tasks.add(() -> "foo");
+ tasks.add(null);
+ assertThrows(NullPointerException.class,
+ () -> executor.invokeAll(tasks, 1, TimeUnit.SECONDS));
+ }
+ }
+
+ /**
+ * Schedules the current thread to be interrupted when it waits (timed or untimed)
+ * at the given method name.
+ */
+ private void scheduleInterruptAt(String methodName) {
+ Thread target = Thread.currentThread();
+ scheduler.submit(() -> {
+ try {
+ boolean found = false;
+ while (!found) {
+ Thread.State state = target.getState();
+ assertTrue(state != TERMINATED);
+ if ((state == WAITING || state == TIMED_WAITING)
+ && contains(target.getStackTrace(), methodName)) {
+ found = true;
+ } else {
+ Thread.sleep(20);
+ }
+ }
+ target.interrupt();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ /**
+ * Returns true if the given stack trace contains an element for the given method name.
+ */
+ private boolean contains(StackTraceElement[] stack, String methodName) {
+ return Arrays.stream(stack)
+ .anyMatch(e -> methodName.equals(e.getMethodName()));
+ }
+}
diff --git a/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java b/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java
new file mode 100644
index 00000000000..026d86a6c85
--- /dev/null
+++ b/test/jdk/java/util/concurrent/ExecutorService/SubmitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/**
+ * @test
+ * @summary Test implementations of ExecutorService.submit/execute
+ * @run junit SubmitTest
+ */
+
+import java.time.Duration;
+import java.util.concurrent.*;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.*;
+
+class SubmitTest {
+
+ private static Stream<ExecutorService> executors() {
+ return Stream.of(
+ Executors.newCachedThreadPool(),
+ Executors.newVirtualThreadPerTaskExecutor(),
+ new ForkJoinPool()
+ );
+ }
+
+ /**
+ * Test submit(Runnable) executes the task.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnable(ExecutorService executor) throws Exception {
+ try (executor) {
+ var latch = new CountDownLatch(1);
+ Future<?> future = executor.submit(latch::countDown);
+ latch.await();
+ assertNull(future.get());
+ }
+ }
+
+ /**
+ * Test submit(Runnable) throws if executor is shutdown.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnableAfterShutdown(ExecutorService executor) {
+ executor.shutdown();
+ assertThrows(RejectedExecutionException.class, () -> executor.submit(() -> { }));
+ }
+
+ /**
+ * Test task submitted with submit(Runnable) is not interrupted by cancel(false).
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnableWithCancelFalse(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var stop = new CountDownLatch(1);
+ var done = new CountDownLatch(1);
+ Future<?> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ stop.await();
+ } catch (InterruptedException e) {
+ // ignore
+ } finally {
+ done.countDown();
+ }
+ });
+
+ // wait for task to start
+ started.await();
+
+ // cancel(false), task should not be interrupted
+ future.cancel(false);
+ assertFalse(done.await(500, TimeUnit.MILLISECONDS));
+
+ // let task finish
+ stop.countDown();
+ }
+ }
+
+ /**
+ * Test task submitted with submit(Runnable) is interrupted by cancel(true).
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnableWithCancelTrue(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ Future<?> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ });
+
+ // wait for task to start
+ started.await();
+
+ // cancel(true), task should be interrupted
+ future.cancel(true);
+ interrupted.await();
+ }
+ }
+
+ /**
+ * Test task submitted with submit(Runnable) is interrupted if executor is
+ * stopped with shutdownNow.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnableWithShutdownNow(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ Future<?> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ });
+
+ // wait for task to start
+ started.await();
+
+ // shutdown forcefully, task should be interrupted
+ executor.shutdownNow();
+ interrupted.await();
+ }
+ }
+
+ /**
+ * Test submit(Runnable) throws if task is null.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitRunnableNull(ExecutorService executor) {
+ try (executor) {
+ Runnable nullTask = null;
+ assertThrows(NullPointerException.class, () -> executor.submit(nullTask));
+ assertThrows(NullPointerException.class, () -> executor.submit(nullTask, Void.class));
+ }
+ }
+
+ //
+
+ /**
+ * Test submit(Callable) executes the task.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallable(ExecutorService executor) throws Exception {
+ try (executor) {
+ var latch = new CountDownLatch(1);
+ Future<String> future = executor.submit(() -> {
+ latch.countDown();
+ return "foo";
+ });
+ latch.await();
+ assertEquals("foo", future.get());
+ }
+ }
+
+ /**
+ * Test submit(Callable) throws if executor is shutdown.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallableAfterShutdown(ExecutorService executor) {
+ executor.shutdown();
+ assertThrows(RejectedExecutionException.class, () -> executor.submit(() -> null));
+ }
+
+ /**
+ * Test task submitted with submit(Callable) is not interrupted by cancel(false).
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallableWithCancelFalse(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var stop = new CountDownLatch(1);
+ var done = new CountDownLatch(1);
+ Future<Void> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ stop.await();
+ } finally {
+ done.countDown();
+ }
+ return null;
+ });
+
+ // wait for task to start
+ started.await();
+
+ // cancel(false), task should not be interrupted
+ future.cancel(false);
+ assertFalse(done.await(500, TimeUnit.MILLISECONDS));
+
+ // let task finish
+ stop.countDown();
+ }
+ }
+
+ /**
+ * Test task submitted with submit(Callable) is interrupted by cancel(true).
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallableWithCancelTrue(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ Future<Void> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ return null;
+ });
+
+ // wait for task to start
+ started.await();
+
+ // cancel(true), task should be interrupted
+ future.cancel(true);
+ interrupted.await();
+ }
+ }
+
+ /**
+ * Test task submitted with submit(Callable) is interrupted if executor is
+ * stopped with shutdownNow.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallableWithShutdownNow(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ Future<Void> future = executor.submit(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ return null;
+ });
+
+ // wait for task to start
+ started.await();
+
+ // shutdown forcefully, task should be interrupted
+ executor.shutdownNow();
+ interrupted.await();
+ }
+ }
+
+ /**
+ * Test submit(Callable) throws if task is null.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testSubmitCallableNull(ExecutorService executor) {
+ try (executor) {
+ Callable<Void> nullTask = null;
+ assertThrows(NullPointerException.class, () -> executor.submit(nullTask));
+ }
+ }
+
+ //
+
+ /**
+ * Test execute(Runnable) executes the task.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testExecute(ExecutorService executor) throws Exception {
+ try (executor) {
+ var latch = new CountDownLatch(1);
+ executor.execute(latch::countDown);
+ latch.await();
+ }
+ }
+
+ /**
+ * Test execute(Runnable) throws if executor is shutdown.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testExecuteAfterShutdown(ExecutorService executor) {
+ executor.shutdown();
+ assertThrows(RejectedExecutionException.class, () -> executor.execute(() -> { }));
+ }
+
+ /**
+ * Test task submitted with execute(Runnable) is interrupted if executor is
+ * stopped with shutdownNow.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testExecuteWithShutdownNow(ExecutorService executor) throws Exception {
+ try (executor) {
+ var started = new CountDownLatch(1);
+ var interrupted = new CountDownLatch(1);
+ executor.execute(() -> {
+ started.countDown();
+ try {
+ Thread.sleep(Duration.ofDays(1));
+ } catch (InterruptedException e) {
+ interrupted.countDown();
+ }
+ });
+
+ // wait for task to start
+ started.await();
+
+ // shutdown forcefully, task should be interrupted
+ executor.shutdownNow();
+ interrupted.await();
+ }
+ }
+
+ /**
+ * Test execute(Runnable) throws if task is null.
+ */
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testExecuteNull(ExecutorService executor) {
+ try (executor) {
+ Runnable nullTask = null;
+ assertThrows(NullPointerException.class, () -> executor.execute(nullTask));
+ }
+ }
+}
diff --git a/test/jdk/java/util/concurrent/Future/DefaultMethods.java b/test/jdk/java/util/concurrent/Future/DefaultMethods.java
index 2571c568043..dfeb5bde64e 100644
--- a/test/jdk/java/util/concurrent/Future/DefaultMethods.java
+++ b/test/jdk/java/util/concurrent/Future/DefaultMethods.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -25,42 +25,50 @@
* @test
* @summary Test Future's default methods
* @library ../lib
- * @run testng DefaultMethods
+ * @run junit DefaultMethods
*/
-import java.util.concurrent.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
import static java.util.concurrent.Future.State.*;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.*;
-public class DefaultMethods {
+class DefaultMethods {
- @DataProvider(name = "executors")
- public Object[][] executors() {
- return new Object[][] {
- // ensures that default implementation is tested
- { new DelegatingExecutorService(Executors.newCachedThreadPool()), },
+ static Stream<ExecutorService> executors() {
+ return Stream.of(
+ // ensures that default close method is tested
+ new DelegatingExecutorService(Executors.newCachedThreadPool()),
- // executors that may return a Future that overrides the methods
- { new ForkJoinPool(), },
- { Executors.newCachedThreadPool(), }
- };
+ // executors that may return a Future that overrides the methods
+ Executors.newCachedThreadPool(),
+ Executors.newVirtualThreadPerTaskExecutor(),
+ new ForkJoinPool()
+ );
}
/**
* Test methods when the task has not completed.
*/
- @Test(dataProvider = "executors")
- public void testRunningTask(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testRunningTask(ExecutorService executor) {
try (executor) {
var latch = new CountDownLatch(1);
Future<?> future = executor.submit(() -> { latch.await(); return null; });
try {
assertTrue(future.state() == RUNNING);
- expectThrows(IllegalStateException.class, future::resultNow);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::exceptionNow);
} finally {
latch.countDown();
}
@@ -70,41 +78,44 @@ public class DefaultMethods {
/**
* Test methods when the task has already completed with a result.
*/
- @Test(dataProvider = "executors")
- public void testCompletedTask1(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCompletedTask1(ExecutorService executor) {
try (executor) {
Future<String> future = executor.submit(() -> "foo");
awaitDone(future);
assertTrue(future.state() == SUCCESS);
- assertEquals(future.resultNow(), "foo");
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertEquals("foo", future.resultNow());
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
}
/**
* Test methods when the task has already completed with null.
*/
- @Test(dataProvider = "executors")
- public void testCompletedTask2(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCompletedTask2(ExecutorService executor) {
try (executor) {
Future<String> future = executor.submit(() -> null);
awaitDone(future);
assertTrue(future.state() == SUCCESS);
- assertEquals(future.resultNow(), null);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertNull(future.resultNow());
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
}
/**
* Test methods when the task has completed with an exception.
*/
- @Test(dataProvider = "executors")
- public void testFailedTask(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testFailedTask(ExecutorService executor) {
try (executor) {
Future<?> future = executor.submit(() -> { throw new ArithmeticException(); });
awaitDone(future);
assertTrue(future.state() == FAILED);
- expectThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
Throwable ex = future.exceptionNow();
assertTrue(ex instanceof ArithmeticException);
}
@@ -113,16 +124,17 @@ public class DefaultMethods {
/**
* Test methods when the task has been cancelled (mayInterruptIfRunning=false)
*/
- @Test(dataProvider = "executors")
- public void testCancelledTask1(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCancelledTask1(ExecutorService executor) {
try (executor) {
var latch = new CountDownLatch(1);
Future<?> future = executor.submit(() -> { latch.await(); return null; });
future.cancel(false);
try {
assertTrue(future.state() == CANCELLED);
- expectThrows(IllegalStateException.class, future::resultNow);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::exceptionNow);
} finally {
latch.countDown();
}
@@ -132,16 +144,17 @@ public class DefaultMethods {
/**
* Test methods when the task has been cancelled (mayInterruptIfRunning=true)
*/
- @Test(dataProvider = "executors")
- public void testCancelledTask2(ExecutorService executor) {
+ @ParameterizedTest
+ @MethodSource("executors")
+ void testCancelledTask2(ExecutorService executor) {
try (executor) {
var latch = new CountDownLatch(1);
Future<?> future = executor.submit(() -> { latch.await(); return null; });
future.cancel(true);
try {
assertTrue(future.state() == CANCELLED);
- expectThrows(IllegalStateException.class, future::resultNow);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::exceptionNow);
} finally {
latch.countDown();
}
@@ -152,46 +165,46 @@ public class DefaultMethods {
* Test CompletableFuture with the task has not completed.
*/
@Test
- public void testCompletableFuture1() {
+ void testCompletableFuture1() {
var future = new CompletableFuture<String>();
assertTrue(future.state() == RUNNING);
- expectThrows(IllegalStateException.class, future::resultNow);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
/**
* Test CompletableFuture with the task that completed with result.
*/
@Test
- public void testCompletableFuture2() {
+ void testCompletableFuture2() {
var future = new CompletableFuture<String>();
future.complete("foo");
assertTrue(future.state() == SUCCESS);
- assertEquals(future.resultNow(), "foo");
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertEquals("foo", future.resultNow());
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
/**
* Test CompletableFuture with the task that completed with null.
*/
@Test
- public void testCompletableFuture3() {
+ void testCompletableFuture3() {
var future = new CompletableFuture<String>();
future.complete(null);
assertTrue(future.state() == SUCCESS);
- assertEquals(future.resultNow(), null);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertNull(future.resultNow());
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
/**
* Test CompletableFuture with the task that completed with exception.
*/
@Test
- public void testCompletableFuture4() {
+ void testCompletableFuture4() {
var future = new CompletableFuture<String>();
future.completeExceptionally(new ArithmeticException());
assertTrue(future.state() == FAILED);
- expectThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
Throwable ex = future.exceptionNow();
assertTrue(ex instanceof ArithmeticException);
}
@@ -200,12 +213,12 @@ public class DefaultMethods {
* Test CompletableFuture with the task that was cancelled.
*/
@Test
- public void testCompletableFuture5() {
+ void testCompletableFuture5() {
var future = new CompletableFuture<String>();
future.cancel(false);
assertTrue(future.state() == CANCELLED);
- expectThrows(IllegalStateException.class, future::resultNow);
- expectThrows(IllegalStateException.class, future::exceptionNow);
+ assertThrows(IllegalStateException.class, future::resultNow);
+ assertThrows(IllegalStateException.class, future::exceptionNow);
}
/**
diff --git a/test/jdk/java/util/concurrent/TEST.properties b/test/jdk/java/util/concurrent/TEST.properties
new file mode 100644
index 00000000000..74a024f2880
--- /dev/null
+++ b/test/jdk/java/util/concurrent/TEST.properties
@@ -0,0 +1 @@
+maxOutputSize=2000000
diff --git a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
index 7f7618a2189..5af31673167 100644
--- a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
+++ b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -23,11 +23,11 @@
/*
* @test
- * @run testng AsyncShutdownNow
- * @summary Test invoking shutdownNow with threads blocked in Future.get,
- * invokeAll, and invokeAny
+ * @summary Test ForkJoinPool.shutdownNow with threads blocked in invokeXXX and Future.get
+ * @run junit AsyncShutdownNow
*/
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
@@ -38,42 +38,39 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static java.lang.Thread.State.*;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import static org.junit.jupiter.api.Assertions.*;
-public class AsyncShutdownNow {
+class AsyncShutdownNow {
// long running interruptible task
private static final Callable<Void> SLEEP_FOR_A_DAY = () -> {
- Thread.sleep(86400_000);
+ Thread.sleep(Duration.ofDays(1));
return null;
};
- /**
- * The executors to test.
- */
- @DataProvider(name = "executors")
- public Object[][] executors() {
- return new Object[][] {
- { new ForkJoinPool() },
- { new ForkJoinPool(1) },
- };
+ static Stream<ForkJoinPool> pools() {
+ return Stream.of(
+ new ForkJoinPool(),
+ new ForkJoinPool(1)
+ );
}
/**
- * Test shutdownNow with running task and thread blocked in Future::get.
+ * Test shutdownNow with a running task and main thread blocked in Future::get.
*/
- @Test(dataProvider = "executors")
- public void testFutureGet(ExecutorService executor) throws Exception {
- System.out.format("testFutureGet: %s%n", executor);
- try (executor) {
- Future<?> future = executor.submit(SLEEP_FOR_A_DAY);
+ @ParameterizedTest
+ @MethodSource("pools")
+ void testFutureGet(ForkJoinPool pool) throws Exception {
+ try (pool) {
+ Future<?> future = pool.submit(SLEEP_FOR_A_DAY);
- // shutdownNow when main thread waits in ForkJoinTask.get
- onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow);
+ // shutdownNow when main thread waits in ForkJoinTask.awaitDone
+ onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow);
try {
future.get();
fail();
@@ -84,16 +81,16 @@ public class AsyncShutdownNow {
}
/**
- * Test shutdownNow with running task and thread blocked in a timed Future::get.
+ * Test shutdownNow with a running task and main thread blocked in a timed Future::get.
*/
- @Test(dataProvider = "executors")
- public void testTimedFutureGet(ExecutorService executor) throws Exception {
- System.out.format("testTimedFutureGet: %s%n", executor);
- try (executor) {
- Future<?> future = executor.submit(SLEEP_FOR_A_DAY);
+ @ParameterizedTest
+ @MethodSource("pools")
+ void testTimedFutureGet(ForkJoinPool pool) throws Exception {
+ try (pool) {
+ Future<?> future = pool.submit(SLEEP_FOR_A_DAY);
- // shutdownNow when main thread waits in ForkJoinTask.get
- onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow);
+ // shutdownNow when main thread waits in ForkJoinTask.awaitDone
+ onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow);
try {
future.get(1, TimeUnit.HOURS);
fail();
@@ -104,15 +101,15 @@ public class AsyncShutdownNow {
}
/**
- * Test shutdownNow with thread blocked in invokeAll.
+ * Test shutdownNow with running tasks and main thread blocked in invokeAll.
*/
- @Test(dataProvider = "executors")
- public void testInvokeAll(ExecutorService executor) throws Exception {
- System.out.format("testInvokeAll: %s%n", executor);
- try (executor) {
- // shutdownNow when main thread waits in ForkJoinTask.quietlyJoin
- onWait("java.util.concurrent.ForkJoinTask.quietlyJoin", executor::shutdownNow);
- List<Future<Void>> futures = executor.invokeAll(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
+ @ParameterizedTest
+ @MethodSource("pools")
+ void testInvokeAll(ForkJoinPool pool) throws Exception {
+ try (pool) {
+ // shutdownNow when main thread waits in ForkJoinTask.awaitDone
+ onWait("java.util.concurrent.ForkJoinTask.awaitDone", pool::shutdownNow);
+ List<Future<Void>> futures = pool.invokeAll(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
for (Future<Void> f : futures) {
assertTrue(f.isDone());
try {
@@ -126,16 +123,16 @@ public class AsyncShutdownNow {
}
/**
- * Test shutdownNow with thread blocked in invokeAny.
+ * Test shutdownNow with running tasks and main thread blocked in invokeAny.
*/
- @Test(dataProvider = "executors", enabled = false)
- public void testInvokeAny(ExecutorService executor) throws Exception {
- System.out.format("testInvokeAny: %s%n", executor);
- try (executor) {
+ @ParameterizedTest
+ @MethodSource("pools")
+ void testInvokeAny(ForkJoinPool pool) throws Exception {
+ try (pool) {
// shutdownNow when main thread waits in ForkJoinTask.get
- onWait("java.util.concurrent.ForkJoinTask.get", executor::shutdownNow);
+ onWait("java.util.concurrent.ForkJoinTask.get", pool::shutdownNow);
try {
- executor.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
+ pool.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
fail();
} catch (ExecutionException e) {
// expected
diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java
index 320f908cd3d..5b1dcfc2b34 100644
--- a/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java
+++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool19Test.java
@@ -249,14 +249,17 @@ public class ForkJoinPool19Test extends JSR166TestCase {
FailingFibAction(int n) { number = n; }
public void compute() {
int n = number;
- if (n <= 1)
- throw new FJException();
- else {
- FailingFibAction f1 = new FailingFibAction(n - 1);
- FailingFibAction f2 = new FailingFibAction(n - 2);
- invokeAll(f1, f2);
- result = f1.result + f2.result;
+ if (n > 1) {
+ try {
+ FailingFibAction f1 = new FailingFibAction(n - 1);
+ FailingFibAction f2 = new FailingFibAction(n - 2);
+ invokeAll(f1, f2);
+ result = f1.result + f2.result;
+ return;
+ } catch (CancellationException fallthrough) {
+ }
}
+ throw new FJException();
}
}
@@ -389,6 +392,7 @@ public class ForkJoinPool19Test extends JSR166TestCase {
}
f.quietlyJoin();
checkCancelled(f);
+ Thread.interrupted();
}};
checkInvoke(a);
a.reinitialize();
@@ -504,36 +508,78 @@ public class ForkJoinPool19Test extends JSR166TestCase {
* Implicitly closing a new pool using try-with-resources terminates it
*/
public void testClose() {
- ForkJoinTask f = new FibAction(8);
- ForkJoinPool pool = null;
- try (ForkJoinPool p = new ForkJoinPool()) {
- pool = p;
- p.execute(f);
- }
- checkCompletedNormally(f);
- assertTrue(pool != null && pool.isTerminated());
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ FibAction f = new FibAction(1);
+ ForkJoinPool pool = null;
+ try (ForkJoinPool p = new ForkJoinPool()) {
+ pool = p;
+ p.execute(f);
+ }
+ assertTrue(pool != null && pool.isTerminated());
+ f.join();
+ assertEquals(1, f.result);
+ }});
+ awaitTermination(t);
}
/**
- * Implicitly closing common pool using try-with-resources has no effect.
+ * Explicitly closing a new pool terminates it
*/
- public void testCloseCommonPool() {
- ForkJoinTask f = new FibAction(8);
- ForkJoinPool pool;
- try (ForkJoinPool p = pool = ForkJoinPool.commonPool()) {
- p.execute(f);
- }
+ public void testClose2() {
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ ForkJoinPool pool = new ForkJoinPool();
+ FibAction f = new FibAction(1);
+ pool.execute(f);
+ pool.close();
+ assertTrue(pool.isTerminated());
+ f.join();
+ assertEquals(1, f.result);
+ }});
+ awaitTermination(t);
+ }
- assertFalse(pool.isShutdown());
- assertFalse(pool.isTerminating());
- assertFalse(pool.isTerminated());
+ /**
+ * Explicitly closing a shutdown pool awaits termination
+ */
+ public void testClose3() {
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ ForkJoinPool pool = new ForkJoinPool();
+ FibAction f = new FibAction(1);
+ pool.execute(f);
+ pool.shutdown();
+ pool.close();
+ assertTrue(pool.isTerminated());
+ f.join();
+ assertEquals(1, f.result);
+ }});
+ awaitTermination(t);
+ }
+ /**
+ * Implicitly closing common pool using try-with-resources has no effect.
+ */
+ public void testCloseCommonPool() {
String prop = System.getProperty(
"java.util.concurrent.ForkJoinPool.common.parallelism");
- if (! "0".equals(prop)) {
- f.join();
- checkCompletedNormally(f);
- }
+ boolean nothreads = "0".equals(prop);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ ForkJoinTask f = new FibAction(8);
+ ForkJoinPool pool;
+ try (ForkJoinPool p = pool = ForkJoinPool.commonPool()) {
+ p.execute(f);
+ }
+ assertFalse(pool.isShutdown());
+ assertFalse(pool.isTerminating());
+ assertFalse(pool.isTerminated());
+ if (!nothreads) {
+ f.join();
+ checkCompletedNormally(f);
+ }
+ }});
+ awaitTermination(t);
}
-
}
diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java
index 9be6ce3d418..2bc5021fcef 100644
--- a/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java
+++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool8Test.java
@@ -232,14 +232,17 @@ public class ForkJoinPool8Test extends JSR166TestCase {
FailingFibAction(int n) { number = n; }
public void compute() {
int n = number;
- if (n <= 1)
- throw new FJException();
- else {
- FailingFibAction f1 = new FailingFibAction(n - 1);
- FailingFibAction f2 = new FailingFibAction(n - 2);
- invokeAll(f1, f2);
- result = f1.result + f2.result;
+ if (n > 1) {
+ try {
+ FailingFibAction f1 = new FailingFibAction(n - 1);
+ FailingFibAction f2 = new FailingFibAction(n - 2);
+ invokeAll(f1, f2);
+ result = f1.result + f2.result;
+ return;
+ } catch (CancellationException fallthrough) {
+ }
}
+ throw new FJException();
}
}
@@ -402,7 +405,9 @@ public class ForkJoinPool8Test extends JSR166TestCase {
try {
f.get(randomTimeout(), null);
shouldThrow();
- } catch (NullPointerException success) {}
+ } catch (NullPointerException success) {
+ f.join();
+ }
}};
checkInvoke(a);
}
diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
index 781b7bffe6d..7db3e20a2fb 100644
--- a/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
+++ b/test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java
@@ -627,6 +627,22 @@ public class ForkJoinPoolTest extends JSR166TestCase {
}
/**
+ * invoke throws a RuntimeException if task throws unchecked exception
+ */
+ public void testInvokeUncheckedException() throws Throwable {
+ ForkJoinPool p = new ForkJoinPool(1);
+ try (PoolCleaner cleaner = cleaner(p)) {
+ try {
+ p.invoke(ForkJoinTask.adapt(new Callable<Object>() {
+ public Object call() { throw new ArithmeticException(); }}));
+ shouldThrow();
+ } catch (RuntimeException success) {
+ assertTrue(success.getCause() instanceof ArithmeticException);
+ }
+ }
+ }
+
+ /**
* invokeAny(null) throws NullPointerException
*/
public void testInvokeAny1() throws Throwable {
diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java
index 4eaa9d2a3db..24989cb152b 100644
--- a/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java
+++ b/test/jdk/java/util/concurrent/tck/ForkJoinTaskTest.java
@@ -496,7 +496,9 @@ public class ForkJoinTaskTest extends JSR166TestCase {
try {
f.get(randomTimeout(), null);
shouldThrow();
- } catch (NullPointerException success) {}
+ } catch (NullPointerException success) {
+ f.join();
+ }
}};
testInvokeOnPool(mainPool(), a);
}
@@ -1245,7 +1247,9 @@ public class ForkJoinTaskTest extends JSR166TestCase {
try {
f.get(randomTimeout(), null);
shouldThrow();
- } catch (NullPointerException success) {}
+ } catch (NullPointerException success) {
+ f.join();
+ }
}};
testInvokeOnPool(singletonPool(), a);
}
diff --git a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java
index 1f5a6e09683..76ab189e553 100644
--- a/test/jdk/java/util/concurrent/tck/JSR166TestCase.java
+++ b/test/jdk/java/util/concurrent/tck/JSR166TestCase.java
@@ -129,6 +129,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
@@ -1669,11 +1670,20 @@ public class JSR166TestCase extends TestCase {
checkTimedGet(f, expectedValue, LONG_DELAY_MS);
}
+ // Avoids unwanted interrupts when run inder jtreg
+ static ThreadGroup topThreadGroup() {
+ for (ThreadGroup g = Thread.currentThread().getThreadGroup(), p; ; g = p)
+ if ((p = g.getParent()) == null)
+ return g;
+ }
+ static final ThreadGroup jsr166TestThreadGroup =
+ new ThreadGroup(topThreadGroup(), "jsr1666TestThreadGroup");
+
/**
* Returns a new started daemon Thread running the given runnable.
*/
Thread newStartedThread(Runnable runnable) {
- Thread t = new Thread(runnable);
+ Thread t = new Thread(jsr166TestThreadGroup, runnable);
t.setDaemon(true);
t.start();
return t;
@@ -1693,10 +1703,12 @@ public class JSR166TestCase extends TestCase {
* the thread (in the hope that it may terminate later) and fails.
*/
void awaitTermination(Thread thread, long timeoutMillis) {
- try {
- thread.join(timeoutMillis);
- } catch (InterruptedException fail) {
- threadUnexpectedException(fail);
+ for (;;) { // ignore stray interrupts by test harness
+ try {
+ thread.join(timeoutMillis);
+ break;
+ } catch (InterruptedException ignore) {
+ }
}
if (thread.getState() != Thread.State.TERMINATED) {
String detail = String.format(
@@ -1940,6 +1952,8 @@ public class JSR166TestCase extends TestCase {
@Override protected final void compute() {
try {
realCompute();
+ } catch (CancellationException ex) {
+ throw ex; // expected by some tests
} catch (Throwable fail) {
threadUnexpectedException(fail);
}
@@ -1955,6 +1969,8 @@ public class JSR166TestCase extends TestCase {
@Override protected final T compute() {
try {
return realCompute();
+ } catch (CancellationException ex) {
+ throw ex;
} catch (Throwable fail) {
threadUnexpectedException(fail);
}
diff --git a/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java b/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java
index 64115eec1c9..69bae253dd1 100644
--- a/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java
+++ b/test/jdk/java/util/concurrent/tck/RecursiveActionTest.java
@@ -162,10 +162,10 @@ public class RecursiveActionTest extends JSR166TestCase {
void checkCompletedAbnormally(RecursiveAction a, Throwable t) {
assertTrue(a.isDone());
- assertFalse(a.isCancelled());
assertFalse(a.isCompletedNormally());
assertTrue(a.isCompletedAbnormally());
- assertSame(t.getClass(), a.getException().getClass());
+ if (!a.isCancelled())
+ assertSame(t.getClass(), a.getException().getClass());
assertNull(a.getRawResult());
assertFalse(a.cancel(false));
assertFalse(a.cancel(true));
@@ -222,14 +222,17 @@ public class RecursiveActionTest extends JSR166TestCase {
FailingFibAction(int n) { number = n; }
public void compute() {
int n = number;
- if (n <= 1)
- throw new FJException();
- else {
- FailingFibAction f1 = new FailingFibAction(n - 1);
- FailingFibAction f2 = new FailingFibAction(n - 2);
- invokeAll(f1, f2);
- result = f1.result + f2.result;
+ if (n > 1) {
+ try {
+ FailingFibAction f1 = new FailingFibAction(n - 1);
+ FailingFibAction f2 = new FailingFibAction(n - 2);
+ invokeAll(f1, f2);
+ result = f1.result + f2.result;
+ return;
+ } catch (CancellationException fallthrough) {
+ }
}
+ throw new FJException();
}
}
@@ -488,7 +491,9 @@ public class RecursiveActionTest extends JSR166TestCase {
try {
f.get(randomTimeout(), null);
shouldThrow();
- } catch (NullPointerException success) {}
+ } catch (NullPointerException success) {
+ f.join();
+ }
}};
testInvokeOnPool(mainPool(), a);
}
diff --git a/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java b/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java
index 66745f4a199..348fef78a6f 100644
--- a/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java
+++ b/test/jdk/java/util/concurrent/tck/RecursiveTaskTest.java
@@ -178,10 +178,10 @@ public class RecursiveTaskTest extends JSR166TestCase {
void checkCompletedAbnormally(RecursiveTask<?> a, Throwable t) {
assertTrue(a.isDone());
- assertFalse(a.isCancelled());
assertFalse(a.isCompletedNormally());
assertTrue(a.isCompletedAbnormally());
- assertSame(t.getClass(), a.getException().getClass());
+ if (!a.isCancelled())
+ assertSame(t.getClass(), a.getException().getClass());
assertNull(a.getRawResult());
assertFalse(a.cancel(false));
assertFalse(a.cancel(true));
@@ -240,11 +240,15 @@ public class RecursiveTaskTest extends JSR166TestCase {
FailingFibTask(int n) { number = n; }
public Integer compute() {
int n = number;
- if (n <= 1)
- throw new FJException();
- FailingFibTask f1 = new FailingFibTask(n - 1);
- f1.fork();
- return new FibTask(n - 2).compute() + f1.join();
+ if (n > 1) {
+ try {
+ FailingFibTask f1 = new FailingFibTask(n - 1);
+ f1.fork();
+ return new FibTask(n - 2).compute() + f1.join();
+ } catch (CancellationException fallthrough) {
+ }
+ }
+ throw new FJException();
}
}
diff --git a/test/jdk/java/util/concurrent/tck/tck.policy b/test/jdk/java/util/concurrent/tck/tck.policy
index 66f79ab3e39..62018f792a3 100644
--- a/test/jdk/java/util/concurrent/tck/tck.policy
+++ b/test/jdk/java/util/concurrent/tck/tck.policy
@@ -1,6 +1,7 @@
grant {
// Permissions j.u.c. needs directly
permission java.lang.RuntimePermission "modifyThread";
+ permission java.lang.RuntimePermission "modifyThreadGroup";
permission java.lang.RuntimePermission "getClassLoader";
permission java.lang.RuntimePermission "setContextClassLoader";
permission java.util.PropertyPermission "*", "read";