< prev index next >

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Print this page
8246585: ForkJoin updates
Reviewed-by: martin

*** 36,52 **** package java.util.concurrent; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; - import java.lang.ref.ReferenceQueue; - import java.lang.ref.WeakReference; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.List; import java.util.RandomAccess; ! import java.util.concurrent.locks.ReentrantLock; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and --- 36,50 ---- package java.util.concurrent; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.List; import java.util.RandomAccess; ! import java.util.concurrent.locks.LockSupport; /** * Abstract base class for tasks that run within a {@link ForkJoinPool}. * A {@code ForkJoinTask} is a thread-like entity that is much * lighter weight than a normal thread. Huge numbers of tasks and
*** 215,566 **** * (1) basic status maintenance * (2) execution and awaiting completion * (3) user-level methods that additionally report results. * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. */ /** ! * The status field holds run control status bits packed into a ! * single int to ensure atomicity. Status is initially zero, and ! * takes on nonnegative values until completed, upon which it ! * holds (sign bit) DONE, possibly with ABNORMAL (cancelled or ! * exceptional) and THROWN (in which case an exception has been ! * stored). Tasks with dependent blocked waiting joiners have the ! * SIGNAL bit set. Completion of a task with SIGNAL set awakens ! * any waiters via notifyAll. (Waiters also help signal others ! * upon completion.) ! * ! * These control bits occupy only (some of) the upper half (16 ! * bits) of status field. The lower bits are used for user-defined ! * tags. ! */ ! volatile int status; // accessed directly by pool and workers private static final int DONE = 1 << 31; // must be negative ! private static final int ABNORMAL = 1 << 18; // set atomically with DONE ! private static final int THROWN = 1 << 17; // set atomically with ABNORMAL ! private static final int SIGNAL = 1 << 16; // true if joiner waiting private static final int SMASK = 0xffff; // short bits for tags ! /** ! * Constructor for subclasses to call. ! */ ! public ForkJoinTask() {} ! ! static boolean isExceptionalStatus(int s) { // needed by subclasses ! return (s & THROWN) != 0; ! } ! /** ! * Sets DONE status and wakes up threads waiting to join this task. ! * ! * @return status on exit ! */ ! private int setDone() { ! int s; ! if (((s = (int)STATUS.getAndBitwiseOr(this, DONE)) & SIGNAL) != 0) ! synchronized (this) { notifyAll(); } ! return s | DONE; } ! ! /** ! * Marks cancelled or exceptional completion unless already done. ! * ! * @param completion must be DONE | ABNORMAL, ORed with THROWN if exceptional ! * @return status on exit ! */ ! private int abnormalCompletion(int completion) { ! for (int s, ns;;) { ! if ((s = status) < 0) ! return s; ! else if (STATUS.weakCompareAndSet(this, s, ns = s | completion)) { ! if ((s & SIGNAL) != 0) ! synchronized (this) { notifyAll(); } ! return ns; } } } /** ! * Primary execution method for stolen tasks. Unless done, calls ! * exec and records status if completed, but doesn't wait for ! * completion otherwise. * ! * @return status on exit from this method */ ! final int doExec() { ! int s; boolean completed; ! if ((s = status) >= 0) { ! try { ! completed = exec(); ! } catch (Throwable rex) { ! completed = false; ! s = setExceptionalCompletion(rex); } ! if (completed) ! s = setDone(); } ! return s; } - - /** - * If not done, sets SIGNAL status and performs Object.wait(timeout). - * This task may or may not be done on exit. Ignores interrupts. - * - * @param timeout using Object.wait conventions. - */ - final void internalWait(long timeout) { - if ((int)STATUS.getAndBitwiseOr(this, SIGNAL) >= 0) { - synchronized (this) { - if (status >= 0) - try { wait(timeout); } catch (InterruptedException ie) { } else ! notifyAll(); } } ! } ! ! /** ! * Blocks a non-worker-thread until completion. ! * @return status upon completion ! */ ! private int externalAwaitDone() { ! int s = tryExternalHelp(); ! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { ! boolean interrupted = false; ! synchronized (this) { ! for (;;) { ! if ((s = status) >= 0) { try { ! wait(0L); ! } catch (InterruptedException ie) { ! interrupted = true; } } ! else { ! notifyAll(); ! break; } } } if (interrupted) Thread.currentThread().interrupt(); } return s; } /** ! * Blocks a non-worker-thread until completion or interruption. */ ! private int externalInterruptibleAwaitDone() throws InterruptedException { ! int s = tryExternalHelp(); ! if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { ! synchronized (this) { ! for (;;) { ! if ((s = status) >= 0) ! wait(0L); ! else { ! notifyAll(); ! break; ! } ! } ! } ! } ! else if (Thread.interrupted()) ! throw new InterruptedException(); return s; } /** ! * Tries to help with tasks allowed for external callers. ! * ! * @return current status */ ! private int tryExternalHelp() { int s; ! return ((s = status) < 0 ? s: ! (this instanceof CountedCompleter) ? ! ForkJoinPool.common.externalHelpComplete( ! (CountedCompleter<?>)this, 0) : ! ForkJoinPool.common.tryExternalUnpush(this) ? ! doExec() : 0); } /** ! * Implementation for join, get, quietlyJoin. Directly handles ! * only cases of already-completed, external wait, and ! * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * ! * @return status upon completion */ ! private int doJoin() { ! int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; ! return (s = status) < 0 ? s : ! ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! (w = (wt = (ForkJoinWorkerThread)t).workQueue). ! tryUnpush(this) && (s = doExec()) < 0 ? s : ! wt.pool.awaitJoin(w, this, 0L) : ! externalAwaitDone(); } /** ! * Implementation for invoke, quietlyInvoke. * ! * @return status upon completion */ ! private int doInvoke() { ! int s; Thread t; ForkJoinWorkerThread wt; ! return (s = doExec()) < 0 ? s : ! ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! (wt = (ForkJoinWorkerThread)t).pool. ! awaitJoin(wt.workQueue, this, 0L) : ! externalAwaitDone(); } - // Exception table support - /** ! * Hash table of exceptions thrown by tasks, to enable reporting ! * by callers. Because exceptions are rare, we don't directly keep ! * them with task objects, but instead use a weak ref table. Note ! * that cancellation exceptions don't appear in the table, but are ! * instead recorded as status values. ! * ! * The exception table has a fixed capacity. */ ! private static final ExceptionNode[] exceptionTable ! = new ExceptionNode[32]; ! ! /** Lock protecting access to exceptionTable. */ ! private static final ReentrantLock exceptionTableLock ! = new ReentrantLock(); ! ! /** Reference queue of stale exceptionally completed tasks. */ ! private static final ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue ! = new ReferenceQueue<>(); ! /** ! * Key-value nodes for exception table. The chained hash table ! * uses identity comparisons, full locking, and weak references ! * for keys. The table has a fixed capacity because it only ! * maintains task exceptions long enough for joiners to access ! * them, so should never become very large for sustained ! * periods. However, since we do not know when the last joiner ! * completes, we must use weak references and expunge them. We do ! * so on each operation (hence full locking). Also, some thread in ! * any ForkJoinPool will call helpExpungeStaleExceptions when its ! * pool becomes isQuiescent. ! */ ! static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { ! final Throwable ex; ! ExceptionNode next; ! final long thrower; // use id not ref to avoid weak cycles ! final int hashCode; // store task hashCode before weak ref disappears ! ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next, ! ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue) { ! super(task, exceptionTableRefQueue); ! this.ex = ex; ! this.next = next; ! this.thrower = Thread.currentThread().getId(); ! this.hashCode = System.identityHashCode(task); ! } } /** ! * Records exception and sets status. * ! * @return status on exit */ ! final int recordExceptionalCompletion(Throwable ex) { ! int s; if ((s = status) >= 0) { - int h = System.identityHashCode(this); - final ReentrantLock lock = exceptionTableLock; - lock.lock(); try { ! expungeStaleExceptions(); ! ExceptionNode[] t = exceptionTable; ! int i = h & (t.length - 1); ! for (ExceptionNode e = t[i]; ; e = e.next) { ! if (e == null) { ! t[i] = new ExceptionNode(this, ex, t[i], ! exceptionTableRefQueue); ! break; ! } ! if (e.get() == this) // already present ! break; ! } ! } finally { ! lock.unlock(); } ! s = abnormalCompletion(DONE | ABNORMAL | THROWN); } return s; } /** ! * Records exception and possibly propagates. * ! * @return status on exit ! */ ! private int setExceptionalCompletion(Throwable ex) { ! int s = recordExceptionalCompletion(ex); ! if ((s & THROWN) != 0) ! internalPropagateException(ex); ! return s; } ! ! /** ! * Hook for exception propagation support for tasks with completers. ! */ ! void internalPropagateException(Throwable ex) { } ! ! /** ! * Cancels, ignoring any exceptions thrown by cancel. Used during ! * worker and pool shutdown. Cancel is spec'ed not to throw any ! * exceptions, but if it does anyway, we have no recourse during ! * shutdown, so guard against this case. ! */ ! static final void cancelIgnoringExceptions(ForkJoinTask<?> t) { ! if (t != null && t.status >= 0) { ! try { ! t.cancel(false); ! } catch (Throwable ignore) { } } } /** ! * Removes exception node and clears status. */ ! private void clearExceptionalCompletion() { ! int h = System.identityHashCode(this); ! final ReentrantLock lock = exceptionTableLock; ! lock.lock(); try { ! ExceptionNode[] t = exceptionTable; ! int i = h & (t.length - 1); ! ExceptionNode e = t[i]; ! ExceptionNode pred = null; ! while (e != null) { ! ExceptionNode next = e.next; ! if (e.get() == this) { ! if (pred == null) ! t[i] = next; ! else ! pred.next = next; ! break; } - pred = e; - e = next; - } - expungeStaleExceptions(); - status = 0; - } finally { - lock.unlock(); } } /** * Returns a rethrowable exception for this task, if available. --- 213,529 ---- * (1) basic status maintenance * (2) execution and awaiting completion * (3) user-level methods that additionally report results. * This is sometimes hard to see because this file orders exported * methods in a way that flows well in javadocs. + * + * Revision notes: The use of "Aux" field replaces previous + * reliance on a table to hold exceptions and synchronized blocks + * and monitors to wait for completion. */ /** ! * Nodes for threads waiting for completion, or holding a thrown ! * exception (never both). Waiting threads prepend nodes ! * Treiber-stack-style. Signallers detach and unpark ! * waiters. Cancelled waiters try to unsplice. ! */ ! static final class Aux { ! final Thread thread; ! final Throwable ex; // null if a waiter ! Aux next; // accessed only via memory-acquire chains ! Aux(Thread thread, Throwable ex) { ! this.thread = thread; ! this.ex = ex; ! } ! final boolean casNext(Aux c, Aux v) { // used only in cancellation ! return NEXT.compareAndSet(this, c, v); ! } ! private static final VarHandle NEXT; ! static { ! try { ! NEXT = MethodHandles.lookup() ! .findVarHandle(Aux.class, "next", Aux.class); ! } catch (ReflectiveOperationException e) { ! throw new ExceptionInInitializerError(e); ! } ! } ! } + /* + * The status field holds bits packed into a single int to ensure + * atomicity. Status is initially zero, and takes on nonnegative + * values until completed, upon which it holds (sign bit) DONE, + * possibly with ABNORMAL (cancelled or exceptional) and THROWN + * (in which case an exception has been stored). A value of + * ABNORMAL without DONE signifies an interrupted wait. These + * control bits occupy only (some of) the upper half (16 bits) of + * status field. The lower bits are used for user-defined tags. + */ private static final int DONE = 1 << 31; // must be negative ! private static final int ABNORMAL = 1 << 16; ! private static final int THROWN = 1 << 17; private static final int SMASK = 0xffff; // short bits for tags + private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel ! // Fields ! volatile int status; // accessed directly by pool and workers ! private transient volatile Aux aux; // either waiters or thrown Exception ! // Support for atomic operations ! private static final VarHandle STATUS; ! private static final VarHandle AUX; ! private int getAndBitwiseOrStatus(int v) { ! return (int)STATUS.getAndBitwiseOr(this, v); ! } ! private boolean casStatus(int c, int v) { ! return STATUS.weakCompareAndSet(this, c, v); ! } ! private boolean casAux(Aux c, Aux v) { ! return AUX.compareAndSet(this, c, v); ! } ! ! /** Removes and unparks waiters */ ! private void signalWaiters() { ! for (Aux a; (a = aux) != null && a.ex == null; ) { ! if (casAux(a, null)) { // detach entire list ! for (Thread t; a != null; a = a.next) { ! if ((t = a.thread) != Thread.currentThread() && t != null) ! LockSupport.unpark(t); // don't self-signal } ! break; } } } /** ! * Possibly blocks until task is done or interrupted or timed out. * ! * @param interruptible true if wait can be cancelled by interrupt ! * @param deadline if non-zero use timed waits and possibly timeout ! * @param pool if nonnull pool to uncompensate after unblocking ! * @return status on exit, or ABNORMAL if interrupted while waiting */ ! private int awaitDone(boolean interruptible, long deadline, ! ForkJoinPool pool) { ! int s; ! boolean interrupted = false, queued = false, parked = false; ! Aux node = null; ! while ((s = status) >= 0) { ! Aux a; long ns; ! if (parked && Thread.interrupted()) { ! if (interruptible) { ! s = ABNORMAL; ! break; } ! interrupted = true; } ! else if (queued) { ! if (deadline != 0L) { ! if ((ns = deadline - System.nanoTime()) <= 0L) ! break; ! LockSupport.parkNanos(ns); } else ! LockSupport.park(); ! parked = true; } + else if (node != null) { + if ((a = aux) != null && a.ex != null) + Thread.onSpinWait(); // exception in progress + else if (queued = casAux(node.next = a, node)) + LockSupport.setCurrentBlocker(this); } ! else { try { ! node = new Aux(Thread.currentThread(), null); ! } catch (Throwable ex) { // try to cancel if cannot create ! casStatus(s, s | (DONE | ABNORMAL)); } } ! } ! if (pool != null) ! pool.uncompensate(); ! ! if (queued) { ! LockSupport.setCurrentBlocker(null); ! if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer ! outer: for (Aux a; (a = aux) != null && a.ex == null; ) { ! for (Aux trail = null;;) { ! Aux next = a.next; ! if (a == node) { ! if (trail != null) ! trail.casNext(trail, next); ! else if (casAux(a, next)) ! break outer; // cannot be re-encountered ! break; // restart ! } else { ! trail = a; ! if ((a = next) == null) ! break outer; ! } } } } + else { + signalWaiters(); // help clean or signal if (interrupted) Thread.currentThread().interrupt(); } + } return s; } /** ! * Sets DONE status and wakes up threads waiting to join this task. ! * @return status on exit */ ! private int setDone() { ! int s = getAndBitwiseOrStatus(DONE) | DONE; ! signalWaiters(); return s; } /** ! * Sets ABNORMAL DONE status unless already done, and wakes up threads ! * waiting to join this task. ! * @return status on exit */ ! private int trySetCancelled() { int s; ! do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL))); ! signalWaiters(); ! return s; } /** ! * Records exception and sets ABNORMAL THROWN DONE status unless ! * already done, and wakes up threads waiting to join this task. ! * If losing a race with setDone or trySetCancelled, the exception ! * may be recorded but not reported. * ! * @return status on exit */ ! final int trySetThrown(Throwable ex) { ! Aux h = new Aux(Thread.currentThread(), ex), p = null; ! boolean installed = false; ! int s; ! while ((s = status) >= 0) { ! Aux a; ! if (!installed && ((a = aux) == null || a.ex == null) && ! (installed = casAux(a, h))) ! p = a; // list of waiters replaced by h ! if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN))) ! break; ! } ! for (; p != null; p = p.next) ! LockSupport.unpark(p.thread); ! return s; } /** ! * Records exception unless already done. Overridable in subclasses. * ! * @return status on exit */ ! int trySetException(Throwable ex) { ! return trySetThrown(ex); } /** ! * Constructor for subclasses to call. */ ! public ForkJoinTask() {} ! static boolean isExceptionalStatus(int s) { // needed by subclasses ! return (s & THROWN) != 0; } /** ! * Unless done, calls exec and records status if completed, but ! * doesn't wait for completion otherwise. * ! * @return status on exit from this method */ ! final int doExec() { ! int s; boolean completed; if ((s = status) >= 0) { try { ! completed = exec(); ! } catch (Throwable rex) { ! s = trySetException(rex); ! completed = false; } ! if (completed) ! s = setDone(); } return s; } /** ! * Helps and/or waits for completion from join, get, or invoke; ! * called from either internal or external threads. * ! * @param ran true if task known to have been exec'd ! * @param interruptible true if park interruptibly when external ! * @param timed true if use timed wait ! * @param nanos if timed, timeout value ! * @return ABNORMAL if interrupted, else status on exit ! */ ! private int awaitJoin(boolean ran, boolean interruptible, boolean timed, ! long nanos) { ! boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; ! Thread t; ForkJoinWorkerThread wt; ! if (internal = ((t = Thread.currentThread()) ! instanceof ForkJoinWorkerThread)) { ! p = (wt = (ForkJoinWorkerThread)t).pool; ! q = wt.workQueue; } ! else { ! p = ForkJoinPool.common; ! q = ForkJoinPool.commonQueue(); ! if (interruptible && Thread.interrupted()) ! return ABNORMAL; } ! if ((s = status) < 0) ! return s; ! long deadline = 0L; ! if (timed) { ! if (nanos <= 0L) ! return 0; ! else if ((deadline = nanos + System.nanoTime()) == 0L) ! deadline = 1L; ! } ! ForkJoinPool uncompensate = null; ! if (q != null && p != null) { // try helping ! if ((!timed || p.isSaturated()) && ! ((this instanceof CountedCompleter) ? ! (s = p.helpComplete(this, q, internal)) < 0 : ! (q.tryRemove(this, internal) && (s = doExec()) < 0))) ! return s; ! if (internal) { ! if ((s = p.helpJoin(this, q)) < 0) ! return s; ! if (s == UNCOMPENSATE) ! uncompensate = p; ! interruptible = false; } } + return awaitDone(interruptible, deadline, uncompensate); } /** ! * Cancels, ignoring any exceptions thrown by cancel. Cancel is ! * spec'ed not to throw any exceptions, but if it does anyway, we ! * have no recourse, so guard against this case. */ ! static final void cancelIgnoringExceptions(Future<?> t) { ! if (t != null) { try { ! t.cancel(true); ! } catch (Throwable ignore) { } } } /** * Returns a rethrowable exception for this task, if available.
*** 575,689 **** * trace. * * @return the exception, or null if none */ private Throwable getThrowableException() { ! int h = System.identityHashCode(this); ! ExceptionNode e; ! final ReentrantLock lock = exceptionTableLock; ! lock.lock(); try { ! expungeStaleExceptions(); ! ExceptionNode[] t = exceptionTable; ! e = t[h & (t.length - 1)]; ! while (e != null && e.get() != this) ! e = e.next; ! } finally { ! lock.unlock(); ! } ! Throwable ex; ! if (e == null || (ex = e.ex) == null) ! return null; ! if (e.thrower != Thread.currentThread().getId()) { ! try { ! Constructor<?> noArgCtor = null; ! // public ctors only for (Constructor<?> c : ex.getClass().getConstructors()) { Class<?>[] ps = c.getParameterTypes(); if (ps.length == 0) noArgCtor = c; ! else if (ps.length == 1 && ps[0] == Throwable.class) ! return (Throwable)c.newInstance(ex); } ! if (noArgCtor != null) { ! Throwable wx = (Throwable)noArgCtor.newInstance(); ! wx.initCause(ex); ! return wx; } } catch (Exception ignore) { } } return ex; } /** ! * Polls stale refs and removes them. Call only while holding lock. */ ! private static void expungeStaleExceptions() { ! for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { ! if (x instanceof ExceptionNode) { ! ExceptionNode[] t = exceptionTable; ! int i = ((ExceptionNode)x).hashCode & (t.length - 1); ! ExceptionNode e = t[i]; ! ExceptionNode pred = null; ! while (e != null) { ! ExceptionNode next = e.next; ! if (e == x) { ! if (pred == null) ! t[i] = next; ! else ! pred.next = next; ! break; ! } ! pred = e; ! e = next; ! } ! } ! } } /** ! * If lock is available, polls stale refs and removes them. ! * Called from ForkJoinPool when pools become quiescent. */ ! static final void helpExpungeStaleExceptions() { ! final ReentrantLock lock = exceptionTableLock; ! if (lock.tryLock()) { ! try { ! expungeStaleExceptions(); ! } finally { ! lock.unlock(); ! } } } /** ! * A version of "sneaky throw" to relay exceptions. */ static void rethrow(Throwable ex) { ForkJoinTask.<RuntimeException>uncheckedThrow(ex); } /** * The sneaky part of sneaky throw, relying on generics * limitations to evade compiler complaints about rethrowing ! * unchecked exceptions. */ @SuppressWarnings("unchecked") static <T extends Throwable> void uncheckedThrow(Throwable t) throws T { ! if (t != null) throw (T)t; // rely on vacuous cast - else - throw new Error("Unknown Exception"); - } - - /** - * Throws exception, if any, associated with the given status. - */ - private void reportException(int s) { - rethrow((s & THROWN) != 0 ? getThrowableException() : - new CancellationException()); } // public methods /** --- 538,629 ---- * trace. * * @return the exception, or null if none */ private Throwable getThrowableException() { ! Throwable ex; Aux a; ! if ((a = aux) == null) ! ex = null; ! else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) { try { ! Constructor<?> noArgCtor = null, oneArgCtor = null; for (Constructor<?> c : ex.getClass().getConstructors()) { Class<?>[] ps = c.getParameterTypes(); if (ps.length == 0) noArgCtor = c; ! else if (ps.length == 1 && ps[0] == Throwable.class) { ! oneArgCtor = c; ! break; } ! } ! if (oneArgCtor != null) ! ex = (Throwable)oneArgCtor.newInstance(ex); ! else if (noArgCtor != null) { ! Throwable rx = (Throwable)noArgCtor.newInstance(); ! rx.initCause(ex); ! ex = rx; } } catch (Exception ignore) { } } return ex; } /** ! * Returns exception associated with the given status, or null if none. */ ! private Throwable getException(int s) { ! Throwable ex = null; ! if ((s & ABNORMAL) != 0 && ! ((s & THROWN) == 0 || (ex = getThrowableException()) == null)) ! ex = new CancellationException(); ! return ex; } /** ! * Throws exception associated with the given status, or ! * CancellationException if none recorded. */ ! private void reportException(int s) { ! ForkJoinTask.<RuntimeException>uncheckedThrow( ! (s & THROWN) != 0 ? getThrowableException() : null); } + + /** + * Throws exception for (timed or untimed) get, wrapping if + * necessary in an ExecutionException. + */ + private void reportExecutionException(int s) { + Throwable ex = null; + if (s == ABNORMAL) + ex = new InterruptedException(); + else if (s >= 0) + ex = new TimeoutException(); + else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null) + ex = new ExecutionException(ex); + ForkJoinTask.<RuntimeException>uncheckedThrow(ex); } /** ! * A version of "sneaky throw" to relay exceptions in other ! * contexts. */ static void rethrow(Throwable ex) { ForkJoinTask.<RuntimeException>uncheckedThrow(ex); } /** * The sneaky part of sneaky throw, relying on generics * limitations to evade compiler complaints about rethrowing ! * unchecked exceptions. If argument null, throws ! * CancellationException. */ @SuppressWarnings("unchecked") static <T extends Throwable> void uncheckedThrow(Throwable t) throws T { ! if (t == null) ! t = new CancellationException(); throw (T)t; // rely on vacuous cast } // public methods /**
*** 700,712 **** * true}. * * @return {@code this}, to simplify usage */ public final ForkJoinTask<V> fork() { ! Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ! ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } --- 640,652 ---- * true}. * * @return {@code this}, to simplify usage */ public final ForkJoinTask<V> fork() { ! Thread t; ForkJoinWorkerThread w; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ! (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool); else ForkJoinPool.common.externalPush(this); return this; }
*** 721,731 **** * * @return the computed result */ public final V join() { int s; ! if (((s = doJoin()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } /** --- 661,673 ---- * * @return the computed result */ public final V join() { int s; ! if ((s = status) >= 0) ! s = awaitJoin(false, false, false, 0L); ! if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } /**
*** 736,746 **** * * @return the computed result */ public final V invoke() { int s; ! if (((s = doInvoke()) & ABNORMAL) != 0) reportException(s); return getRawResult(); } /** --- 678,690 ---- * * @return the computed result */ public final V invoke() { int s; ! if ((s = doExec()) >= 0) ! s = awaitJoin(true, false, false, 0L); ! if ((s & ABNORMAL) != 0) reportException(s); return getRawResult(); } /**
*** 760,773 **** * @param t2 the second task * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; t2.fork(); ! if (((s1 = t1.doInvoke()) & ABNORMAL) != 0) t1.reportException(s1); ! if (((s2 = t2.doJoin()) & ABNORMAL) != 0) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for --- 704,723 ---- * @param t2 the second task * @throws NullPointerException if any task is null */ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { int s1, s2; + if (t1 == null || t2 == null) + throw new NullPointerException(); t2.fork(); ! if ((s1 = t1.doExec()) >= 0) ! s1 = t1.awaitJoin(true, false, false, 0L); ! if ((s1 & ABNORMAL) != 0) { ! cancelIgnoringExceptions(t2); t1.reportException(s1); ! } ! else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0) t2.reportException(s2); } /** * Forks the given tasks, returning when {@code isDone} holds for
*** 786,817 **** */ public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ! ForkJoinTask<?> t = tasks[i]; ! if (t == null) { ! if (ex == null) ex = new NullPointerException(); } - else if (i != 0) t.fork(); - else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) - ex = t.getException(); } for (int i = 1; i <= last; ++i) { ! ForkJoinTask<?> t = tasks[i]; ! if (t != null) { ! if (ex != null) ! t.cancel(false); ! else if ((t.doJoin() & ABNORMAL) != 0) ! ex = t.getException(); } } ! if (ex != null) rethrow(ex); } /** * Forks all tasks in the specified collection, returning when * {@code isDone} holds for each task or an (unchecked) exception * is encountered, in which case the exception is rethrown. If --- 736,778 ---- */ public static void invokeAll(ForkJoinTask<?>... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ! ForkJoinTask<?> t; ! if ((t = tasks[i]) == null) { ex = new NullPointerException(); + break; + } + if (i == 0) { + int s; + if ((s = t.doExec()) >= 0) + s = t.awaitJoin(true, false, false, 0L); + if ((s & ABNORMAL) != 0) + ex = t.getException(s); + break; } t.fork(); } + if (ex == null) { for (int i = 1; i <= last; ++i) { ! ForkJoinTask<?> t; ! if ((t = tasks[i]) != null) { ! int s; ! if ((s = t.status) >= 0) ! s = t.awaitJoin(false, false, false, 0L); ! if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) ! break; } } ! } ! if (ex != null) { ! for (int i = 1; i <= last; ++i) ! cancelIgnoringExceptions(tasks[i]); rethrow(ex); } + } /** * Forks all tasks in the specified collection, returning when * {@code isDone} holds for each task or an (unchecked) exception * is encountered, in which case the exception is rethrown. If
*** 836,868 **** } @SuppressWarnings("unchecked") List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks; Throwable ex = null; ! int last = ts.size() - 1; for (int i = last; i >= 0; --i) { ! ForkJoinTask<?> t = ts.get(i); ! if (t == null) { ! if (ex == null) ex = new NullPointerException(); } - else if (i != 0) t.fork(); - else if ((t.doInvoke() & ABNORMAL) != 0 && ex == null) - ex = t.getException(); } for (int i = 1; i <= last; ++i) { ! ForkJoinTask<?> t = ts.get(i); ! if (t != null) { ! if (ex != null) ! t.cancel(false); ! else if ((t.doJoin() & ABNORMAL) != 0) ! ex = t.getException(); } } ! if (ex != null) rethrow(ex); return tasks; } /** * Attempts to cancel execution of this task. This attempt will --- 797,840 ---- } @SuppressWarnings("unchecked") List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks; Throwable ex = null; ! int last = ts.size() - 1; // nearly same as array version for (int i = last; i >= 0; --i) { ! ForkJoinTask<?> t; ! if ((t = ts.get(i)) == null) { ex = new NullPointerException(); + break; + } + if (i == 0) { + int s; + if ((s = t.doExec()) >= 0) + s = t.awaitJoin(true, false, false, 0L); + if ((s & ABNORMAL) != 0) + ex = t.getException(s); + break; } t.fork(); } + if (ex == null) { for (int i = 1; i <= last; ++i) { ! ForkJoinTask<?> t; ! if ((t = ts.get(i)) != null) { ! int s; ! if ((s = t.status) >= 0) ! s = t.awaitJoin(false, false, false, 0L); ! if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null) ! break; } } ! } ! if (ex != null) { ! for (int i = 1; i <= last; ++i) ! cancelIgnoringExceptions(ts.get(i)); rethrow(ex); + } return tasks; } /** * Attempts to cancel execution of this task. This attempt will
*** 890,901 **** * control cancellation. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ! int s = abnormalCompletion(DONE | ABNORMAL); ! return (s & (ABNORMAL | THROWN)) == ABNORMAL; } public final boolean isDone() { return status < 0; } --- 862,872 ---- * control cancellation. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { ! return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL; } public final boolean isDone() { return status < 0; }
*** 930,943 **** * none or if the method has not yet completed. * * @return the exception, or {@code null} if none */ public final Throwable getException() { ! int s = status; ! return ((s & ABNORMAL) == 0 ? null : ! (s & THROWN) == 0 ? new CancellationException() : ! getThrowableException()); } /** * Completes this task abnormally, and if not already aborted or * cancelled, causes it to throw the given exception upon --- 901,911 ---- * none or if the method has not yet completed. * * @return the exception, or {@code null} if none */ public final Throwable getException() { ! return getException(status); } /** * Completes this task abnormally, and if not already aborted or * cancelled, causes it to throw the given exception upon
*** 951,961 **** * @param ex the exception to throw. If this exception is not a * {@code RuntimeException} or {@code Error}, the actual exception * thrown will be a {@code RuntimeException} with cause {@code ex}. */ public void completeExceptionally(Throwable ex) { ! setExceptionalCompletion((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } /** --- 919,929 ---- * @param ex the exception to throw. If this exception is not a * {@code RuntimeException} or {@code Error}, the actual exception * thrown will be a {@code RuntimeException} with cause {@code ex}. */ public void completeExceptionally(Throwable ex) { ! trySetException((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } /**
*** 973,983 **** */ public void complete(V value) { try { setRawResult(value); } catch (Throwable rex) { ! setExceptionalCompletion(rex); return; } setDone(); } --- 941,951 ---- */ public void complete(V value) { try { setRawResult(value); } catch (Throwable rex) { ! trySetException(rex); return; } setDone(); }
*** 1003,1019 **** * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? ! doJoin() : externalInterruptibleAwaitDone(); ! if ((s & THROWN) != 0) ! throw new ExecutionException(getThrowableException()); ! else if ((s & ABNORMAL) != 0) ! throw new CancellationException(); ! else return getRawResult(); } /** * Waits if necessary for at most the given time for the computation --- 971,983 ---- * exception * @throws InterruptedException if the current thread is not a * member of a ForkJoinPool and was interrupted while waiting */ public final V get() throws InterruptedException, ExecutionException { ! int s; ! if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0) ! reportExecutionException(s); return getRawResult(); } /** * Waits if necessary for at most the given time for the computation
*** 1030,1114 **** * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { int s; ! long nanos = unit.toNanos(timeout); ! if (Thread.interrupted()) ! throw new InterruptedException(); ! if ((s = status) >= 0 && nanos > 0L) { ! long d = System.nanoTime() + nanos; ! long deadline = (d == 0L) ? 1L : d; // avoid 0 ! Thread t = Thread.currentThread(); ! if (t instanceof ForkJoinWorkerThread) { ! ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; ! s = wt.pool.awaitJoin(wt.workQueue, this, deadline); ! } ! else if ((s = ((this instanceof CountedCompleter) ? ! ForkJoinPool.common.externalHelpComplete( ! (CountedCompleter<?>)this, 0) : ! ForkJoinPool.common.tryExternalUnpush(this) ? ! doExec() : 0)) >= 0) { ! long ns, ms; // measure in nanosecs, but wait in millisecs ! while ((s = status) >= 0 && ! (ns = deadline - System.nanoTime()) > 0L) { ! if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L && ! (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) { ! synchronized (this) { ! if (status >= 0) ! wait(ms); // OK to throw InterruptedException ! else ! notifyAll(); ! } ! } ! } ! } ! } ! if (s >= 0) ! throw new TimeoutException(); ! else if ((s & THROWN) != 0) ! throw new ExecutionException(getThrowableException()); ! else if ((s & ABNORMAL) != 0) ! throw new CancellationException(); ! else return getRawResult(); } /** * Joins this task, without returning its result or throwing its * exception. This method may be useful when processing * collections of tasks when some have been cancelled or otherwise * known to have aborted. */ public final void quietlyJoin() { ! doJoin(); } /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its * exception. */ public final void quietlyInvoke() { ! doInvoke(); } /** * Possibly executes tasks until the pool hosting the current task * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This * method may be of use in designs in which many tasks are forked, * but none are explicitly joined, instead executing them until * all are processed. */ public static void helpQuiesce() { ! Thread t; ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { ! ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; ! wt.pool.helpQuiescePool(wt.workQueue); ! } else ! ForkJoinPool.quiesceCommonPool(); } /** * Resets the internal bookkeeping state of this task, allowing a * subsequent {@code fork}. This method allows repeated reuse of --- 994,1044 ---- * @throws TimeoutException if the wait timed out */ public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { int s; ! if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 || ! (s & ABNORMAL) != 0) ! reportExecutionException(s); return getRawResult(); } /** * Joins this task, without returning its result or throwing its * exception. This method may be useful when processing * collections of tasks when some have been cancelled or otherwise * known to have aborted. */ public final void quietlyJoin() { ! if (status >= 0) ! awaitJoin(false, false, false, 0L); } /** * Commences performing this task and awaits its completion if * necessary, without returning its result or throwing its * exception. */ public final void quietlyInvoke() { ! if (doExec() >= 0) ! awaitJoin(true, false, false, 0L); } /** * Possibly executes tasks until the pool hosting the current task * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This * method may be of use in designs in which many tasks are forked, * but none are explicitly joined, instead executing them until * all are processed. */ public static void helpQuiesce() { ! Thread t; ForkJoinWorkerThread w; ForkJoinPool p; ! if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && ! (p = (w = (ForkJoinWorkerThread)t).pool) != null) ! p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false); else ! ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false); } /** * Resets the internal bookkeeping state of this task, allowing a * subsequent {@code fork}. This method allows repeated reuse of
*** 1124,1136 **** * null}. However, the value returned by {@code getRawResult} is * unaffected. To clear this value, you can invoke {@code * setRawResult(null)}. */ public void reinitialize() { ! if ((status & THROWN) != 0) ! clearExceptionalCompletion(); ! else status = 0; } /** * Returns the pool hosting the current thread, or {@code null} --- 1054,1064 ---- * null}. However, the value returned by {@code getRawResult} is * unaffected. To clear this value, you can invoke {@code * setRawResult(null)}. */ public void reinitialize() { ! aux = null; status = 0; } /** * Returns the pool hosting the current thread, or {@code null}
*** 1140,1152 **** * #inForkJoinPool} returns {@code false}. * * @return the pool, or {@code null} if none */ public static ForkJoinPool getPool() { ! Thread t = Thread.currentThread(); ! return (t instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread) t).pool : null; } /** * Returns {@code true} if the current thread is a {@link * ForkJoinWorkerThread} executing as a ForkJoinPool computation. --- 1068,1080 ---- * #inForkJoinPool} returns {@code false}. * * @return the pool, or {@code null} if none */ public static ForkJoinPool getPool() { ! Thread t; ! return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread) t).pool : null); } /** * Returns {@code true} if the current thread is a {@link * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
*** 1168,1181 **** * that could have been, but were not, stolen. * * @return {@code true} if unforked */ public boolean tryUnfork() { ! Thread t; ! return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : ! ForkJoinPool.common.tryExternalUnpush(this)); } /** * Returns an estimate of the number of tasks that have been * forked by the current worker thread but not yet executed. This --- 1096,1111 ---- * that could have been, but were not, stolen. * * @return {@code true} if unforked */ public boolean tryUnfork() { ! Thread t; ForkJoinPool.WorkQueue q; ! return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ! ? (q = ((ForkJoinWorkerThread)t).workQueue) != null ! && q.tryUnpush(this) ! : (q = ForkJoinPool.commonQueue()) != null ! && q.externalTryUnpush(this); } /** * Returns an estimate of the number of tasks that have been * forked by the current worker thread but not yet executed. This
*** 1187,1197 **** public static int getQueuedTaskCount() { Thread t; ForkJoinPool.WorkQueue q; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else ! q = ForkJoinPool.commonSubmitterQueue(); return (q == null) ? 0 : q.queueSize(); } /** * Returns an estimate of how many more locally queued tasks are --- 1117,1127 ---- public static int getQueuedTaskCount() { Thread t; ForkJoinPool.WorkQueue q; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else ! q = ForkJoinPool.commonQueue(); return (q == null) ? 0 : q.queueSize(); } /** * Returns an estimate of how many more locally queued tasks are
*** 1262,1272 **** protected static ForkJoinTask<?> peekNextLocalTask() { Thread t; ForkJoinPool.WorkQueue q; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else ! q = ForkJoinPool.commonSubmitterQueue(); return (q == null) ? null : q.peek(); } /** * Unschedules and returns, without executing, the next task --- 1192,1202 ---- protected static ForkJoinTask<?> peekNextLocalTask() { Thread t; ForkJoinPool.WorkQueue q; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else ! q = ForkJoinPool.commonQueue(); return (q == null) ? null : q.peek(); } /** * Unschedules and returns, without executing, the next task
*** 1277,1289 **** * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask<?> pollNextLocalTask() { Thread t; ! return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : ! null; } /** * If the current thread is operating in a ForkJoinPool, * unschedules and returns, without executing, the next task --- 1207,1218 ---- * * @return the next task, or {@code null} if none are available */ protected static ForkJoinTask<?> pollNextLocalTask() { Thread t; ! return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null); } /** * If the current thread is operating in a ForkJoinPool, * unschedules and returns, without executing, the next task
*** 1296,1309 **** * otherwise. * * @return a task, or {@code null} if none are available */ protected static ForkJoinTask<?> pollTask() { ! Thread t; ForkJoinWorkerThread wt; ! return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : ! null; } /** * If the current thread is operating in a ForkJoinPool, * unschedules and returns, without executing, a task externally --- 1225,1238 ---- * otherwise. * * @return a task, or {@code null} if none are available */ protected static ForkJoinTask<?> pollTask() { ! Thread t; ForkJoinWorkerThread w; ! return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) : ! null); } /** * If the current thread is operating in a ForkJoinPool, * unschedules and returns, without executing, a task externally
*** 1315,1326 **** * @return a task, or {@code null} if none are available * @since 9 */ protected static ForkJoinTask<?> pollSubmission() { Thread t; ! return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread)t).pool.pollSubmission() : null; } // tag operations /** --- 1244,1255 ---- * @return a task, or {@code null} if none are available * @since 9 */ protected static ForkJoinTask<?> pollSubmission() { Thread t; ! return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ! ((ForkJoinWorkerThread)t).pool.pollSubmission() : null); } // tag operations /**
*** 1340,1351 **** * @return the previous value of the tag * @since 1.8 */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { ! if (STATUS.weakCompareAndSet(this, s = status, ! (s & ~SMASK) | (newValue & SMASK))) return (short)s; } } /** --- 1269,1279 ---- * @return the previous value of the tag * @since 1.8 */ public final short setForkJoinTaskTag(short newValue) { for (int s;;) { ! if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK))) return (short)s; } } /**
*** 1364,1375 **** */ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) { for (int s;;) { if ((short)(s = status) != expect) return false; ! if (STATUS.weakCompareAndSet(this, s, ! (s & ~SMASK) | (update & SMASK))) return true; } } /** --- 1292,1302 ---- */ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) { for (int s;;) { if ((short)(s = status) != expect) return false; ! if (casStatus(s, (s & ~SMASK) | (update & SMASK))) return true; } } /**
*** 1430,1441 **** this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } ! void internalPropagateException(Throwable ex) { ! rethrow(ex); // rethrow outside exec() catches. } private static final long serialVersionUID = 5232453952276885070L; } /** --- 1357,1377 ---- this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } ! int trySetException(Throwable ex) { // if a handler, invoke it ! int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h; ! if (isExceptionalStatus(s = trySetThrown(ex)) && ! (h = ((t = Thread.currentThread()). ! getUncaughtExceptionHandler())) != null) { ! try { ! h.uncaughtException(t, ex); ! } catch (Throwable ignore) { ! } ! } ! return s; } private static final long serialVersionUID = 5232453952276885070L; } /**
*** 1468,1477 **** --- 1404,1460 ---- return super.toString() + "[Wrapped task = " + callable + "]"; } private static final long serialVersionUID = 2838392045355241008L; } + static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T> + implements RunnableFuture<T> { + @SuppressWarnings("serial") // Conditionally serializable + final Callable<? extends T> callable; + @SuppressWarnings("serial") // Conditionally serializable + transient volatile Thread runner; + T result; + AdaptedInterruptibleCallable(Callable<? extends T> callable) { + if (callable == null) throw new NullPointerException(); + this.callable = callable; + } + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + public final boolean exec() { + Thread.interrupted(); + runner = Thread.currentThread(); + try { + if (!isDone()) // recheck + result = callable.call(); + return true; + } catch (RuntimeException rex) { + throw rex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } finally { + runner = null; + Thread.interrupted(); + } + } + public final void run() { invoke(); } + public final boolean cancel(boolean mayInterruptIfRunning) { + Thread t; + boolean stat = super.cancel(false); + if (mayInterruptIfRunning && (t = runner) != null) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + return stat; + } + public String toString() { + return super.toString() + "[Wrapped task = " + callable + "]"; + } + private static final long serialVersionUID = 2838392045355241008L; + } + /** * Returns a new {@code ForkJoinTask} that performs the {@code run} * method of the given {@code Runnable} as its action, and returns * a null result upon {@link #join}. *
*** 1508,1517 **** --- 1491,1520 ---- */ public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) { return new AdaptedCallable<T>(callable); } + /** + * Returns a new {@code ForkJoinTask} that performs the {@code call} + * method of the given {@code Callable} as its action, and returns + * its result upon {@link #join}, translating any checked exceptions + * encountered into {@code RuntimeException}. Additionally, + * invocations of {@code cancel} with {@code mayInterruptIfRunning + * true} will attempt to interrupt the thread performing the task. + * + * @param callable the callable action + * @param <T> the type of the callable's result + * @return the task + * + * @since 17 + */ + // adaptInterruptible deferred to its own independent change + // https://bugs.openjdk.java.net/browse/JDK-8246587 + /* TODO: public */ private static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) { + return new AdaptedInterruptibleCallable<T>(callable); + } + // Serialization support private static final long serialVersionUID = -7721805057305804111L; /**
*** 1522,1533 **** * @serialData the current run status and the exception thrown * during execution, or {@code null} if none */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { s.defaultWriteObject(); ! s.writeObject(getException()); } /** * Reconstitutes this task from a stream (that is, deserializes it). * @param s the stream --- 1525,1537 ---- * @serialData the current run status and the exception thrown * during execution, or {@code null} if none */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + Aux a; s.defaultWriteObject(); ! s.writeObject((a = aux) == null ? null : a.ex); } /** * Reconstitutes this task from a stream (that is, deserializes it). * @param s the stream
*** 1538,1556 **** private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); Object ex = s.readObject(); if (ex != null) ! setExceptionalCompletion((Throwable)ex); } - // VarHandle mechanics - private static final VarHandle STATUS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } --- 1542,1559 ---- private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); Object ex = s.readObject(); if (ex != null) ! trySetThrown((Throwable)ex); } static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class); + AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } }
< prev index next >