< prev index next >

src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java

Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin

@@ -307,35 +307,16 @@
      *    CAS-advance the tail pointer by at least two hops.
      *
      * 2. Await match or cancellation (method awaitMatch)
      *
      *    Wait for another thread to match node; instead cancelling if
-     *    the current thread was interrupted or the wait timed out. On
-     *    multiprocessors, we use front-of-queue spinning: If a node
-     *    appears to be the first unmatched node in the queue, it
-     *    spins a bit before blocking. In either case, before blocking
-     *    it tries to unsplice any nodes between the current "head"
-     *    and the first unmatched node.
-     *
-     *    Front-of-queue spinning vastly improves performance of
-     *    heavily contended queues. And so long as it is relatively
-     *    brief and "quiet", spinning does not much impact performance
-     *    of less-contended queues.  During spins threads check their
-     *    interrupt status and generate a thread-local random number
-     *    to decide to occasionally perform a Thread.yield. While
-     *    yield has underdefined specs, we assume that it might help,
-     *    and will not hurt, in limiting impact of spinning on busy
-     *    systems.  We also use smaller (1/2) spins for nodes that are
-     *    not known to be front but whose predecessors have not
-     *    blocked -- these "chained" spins avoid artifacts of
-     *    front-of-queue rules which otherwise lead to alternating
-     *    nodes spinning vs blocking. Further, front threads that
-     *    represent phase changes (from data to request node or vice
-     *    versa) compared to their predecessors receive additional
-     *    chained spins, reflecting longer paths typically required to
-     *    unblock threads during phase changes.
-     *
+     *    the current thread was interrupted or the wait timed out. To
+     *    improve performance in common single-source / single-sink
+     *    usages when there are more tasks that cores, an initial
+     *    Thread.yield is tried when there is apparently only one
+     *    waiter.  In other cases, waiters may help with some
+     *    bookkeeping, then park/unpark.
      *
      * ** Unlinking removed interior nodes **
      *
      * In addition to minimizing garbage retention via self-linking
      * described above, we also unlink removed interior nodes. These

@@ -367,34 +348,13 @@
      * never fall off the list because of an untimed call to take() at
      * the front of the queue.
      *
      * When these cases arise, rather than always retraversing the
      * entire list to find an actual predecessor to unlink (which
-     * won't help for case (1) anyway), we record a conservative
-     * estimate of possible unsplice failures (in "sweepVotes").
-     * We trigger a full sweep when the estimate exceeds a threshold
-     * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
-     * removal failures to tolerate before sweeping through, unlinking
-     * cancelled nodes that were not unlinked upon initial removal.
-     * We perform sweeps by the thread hitting threshold (rather than
-     * background threads or by spreading work to other threads)
-     * because in the main contexts in which removal occurs, the
-     * caller is timed-out or cancelled, which are not time-critical
-     * enough to warrant the overhead that alternatives would impose
-     * on other threads.
-     *
-     * Because the sweepVotes estimate is conservative, and because
-     * nodes become unlinked "naturally" as they fall off the head of
-     * the queue, and because we allow votes to accumulate even while
-     * sweeps are in progress, there are typically significantly fewer
-     * such nodes than estimated.  Choice of a threshold value
-     * balances the likelihood of wasted effort and contention, versus
-     * providing a worst-case bound on retention of interior nodes in
-     * quiescent queues. The value defined below was chosen
-     * empirically to balance these under various timeout scenarios.
-     *
-     * Because traversal operations on the linked list of nodes are a
+     * won't help for case (1) anyway), we record the need to sweep the
+     * next time any thread would otherwise block in awaitMatch. Also,
+     * because traversal operations on the linked list of nodes are a
      * natural opportunity to sweep dead nodes, we generally do so,
      * including all the operations that might remove elements as they
      * traverse, such as removeIf and Iterator.remove.  This largely
      * eliminates long chains of dead interior nodes, except from
      * cancelled or timed out blocking operations.

@@ -403,32 +363,16 @@
      * sweeps. However, the associated garbage chains terminate when
      * some successor ultimately falls off the head of the list and is
      * self-linked.
      */
 
-    /** True if on multiprocessor */
-    private static final boolean MP =
-        Runtime.getRuntime().availableProcessors() > 1;
-
-    /**
-     * The number of times to spin (with randomly interspersed calls
-     * to Thread.yield) on multiprocessor before blocking when a node
-     * is apparently the first waiter in the queue.  See above for
-     * explanation. Must be a power of two. The value is empirically
-     * derived -- it works pretty well across a variety of processors,
-     * numbers of CPUs, and OSes.
-     */
-    private static final int FRONT_SPINS   = 1 << 7;
-
     /**
-     * The number of times to spin before blocking when a node is
-     * preceded by another node that is apparently spinning.  Also
-     * serves as an increment to FRONT_SPINS on phase changes, and as
-     * base average frequency for yielding during spins. Must be a
-     * power of two.
+     * The number of nanoseconds for which it is faster to spin
+     * rather than to use timed park. A rough estimate suffices.
+     * Using a power of two minus one simplifies some comparisons.
      */
-    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
+    static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
 
     /**
      * The maximum number of estimated removal failures (sweepVotes)
      * to tolerate before sweeping through the queue unlinking
      * cancelled nodes that were not unlinked upon initial

@@ -440,11 +384,11 @@
     /**
      * Queue nodes. Uses Object, not E, for items to allow forgetting
      * them after use.  Writes that are intrinsically ordered wrt
      * other accesses or CASes use simple relaxed forms.
      */
-    static final class Node {
+    static final class Node implements ForkJoinPool.ManagedBlocker {
         final boolean isData;   // false if this is a request node
         volatile Object item;   // initially non-null if isData; CASed to match
         volatile Node next;
         volatile Thread waiter; // null when not waiting for a match
 

@@ -485,28 +429,11 @@
         }
 
         final void appendRelaxed(Node next) {
             // assert next != null;
             // assert this.next == null;
-            NEXT.set(this, next);
-        }
-
-        /**
-         * Sets item (of a request node) to self and waiter to null,
-         * to avoid garbage retention after matching or cancelling.
-         * Uses relaxed writes because order is already constrained in
-         * the only calling contexts: item is forgotten only after
-         * volatile/atomic mechanics that extract items, and visitors
-         * of request nodes only ever check whether item is null.
-         * Similarly, clearing waiter follows either CAS or return
-         * from park (if ever parked; else we don't care).
-         */
-        final void forgetContents() {
-            // assert isMatched();
-            if (!isData)
-                ITEM.set(this, this);
-            WAITER.set(this, null);
+            NEXT.setOpaque(this, next);
         }
 
         /**
          * Returns true if this node has been matched, including the
          * case of artificial matches due to cancellation.

@@ -532,10 +459,20 @@
         final boolean cannotPrecede(boolean haveData) {
             boolean d = isData;
             return d != haveData && d != (item == null);
         }
 
+        public final boolean isReleasable() {
+            return (isData == (item == null)) ||
+                Thread.currentThread().isInterrupted();
+        }
+
+        public final boolean block() {
+            while (!isReleasable()) LockSupport.park();
+            return true;
+        }
+
         private static final long serialVersionUID = -3375979862319811754L;
     }
 
     /**
      * A node from which the first live (non-matched) node (if any)

@@ -564,11 +501,11 @@
      * - tail.next may or may not be self-linked.
      */
     private transient volatile Node tail;
 
     /** The number of apparent failures to unsplice cancelled nodes */
-    private transient volatile int sweepVotes;
+    private transient volatile boolean needSweep;
 
     private boolean casTail(Node cmp, Node val) {
         // assert cmp != null;
         // assert val != null;
         return TAIL.compareAndSet(this, cmp, val);

@@ -576,15 +513,10 @@
 
     private boolean casHead(Node cmp, Node val) {
         return HEAD.compareAndSet(this, cmp, val);
     }
 
-    /** Atomic version of ++sweepVotes. */
-    private int incSweepVotes() {
-        return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
-    }
-
     /**
      * Tries to CAS pred.next (or head, if pred is null) from c to p.
      * Caller must ensure that we're not unlinking the trailing node.
      */
     private boolean tryCasSuccessor(Node pred, Node c, Node p) {

@@ -687,80 +619,70 @@
             }
         }
     }
 
     /**
-     * Spins/yields/blocks until node s is matched or caller gives up.
+     * Possibly blocks until node s is matched or caller gives up.
      *
      * @param s the waiting node
      * @param pred the predecessor of s, or null if unknown (the null
      * case does not occur in any current calls but may in possible
      * future extensions)
      * @param e the comparison value for checking match
      * @param timed if true, wait only until timeout elapses
      * @param nanos timeout in nanosecs, used only if timed is true
      * @return matched item, or e if unmatched on interrupt or timeout
      */
+    @SuppressWarnings("unchecked")
     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
+        final boolean isData = s.isData;
         final long deadline = timed ? System.nanoTime() + nanos : 0L;
-        Thread w = Thread.currentThread();
-        int spins = -1; // initialized after first item and cancel checks
-        ThreadLocalRandom randomYields = null; // bound if needed
-
-        for (;;) {
-            final Object item;
-            if ((item = s.item) != e) {       // matched
-                // assert item != s;
-                s.forgetContents();           // avoid garbage
-                @SuppressWarnings("unchecked") E itemE = (E) item;
-                return itemE;
-            }
-            else if (w.isInterrupted() || (timed && nanos <= 0L)) {
-                // try to cancel and unlink
-                if (s.casItem(e, s.isData ? null : s)) {
-                    unsplice(pred, s);
+        final Thread w = Thread.currentThread();
+        int stat = -1;                   // -1: may yield, +1: park, else 0
+        Object item;
+        while ((item = s.item) == e) {
+            if (needSweep)               // help clean
+                sweep();
+            else if ((timed && nanos <= 0L) || w.isInterrupted()) {
+                if (s.casItem(e, (e == null) ? s : null)) {
+                    unsplice(pred, s);   // cancelled
                     return e;
                 }
-                // return normally if lost CAS
             }
-            else if (spins < 0) {            // establish spins at/near front
-                if ((spins = spinsFor(pred, s.isData)) > 0)
-                    randomYields = ThreadLocalRandom.current();
+            else if (stat <= 0) {
+                if (pred != null && pred.next == s) {
+                    if (stat < 0 &&
+                        (pred.isData != isData || pred.isMatched())) {
+                        stat = 0;        // yield once if first
+                        Thread.yield();
             }
-            else if (spins > 0) {             // spin
-                --spins;
-                if (randomYields.nextInt(CHAINED_SPINS) == 0)
-                    Thread.yield();           // occasionally yield
+                    else {
+                        stat = 1;
+                        s.waiter = w;    // enable unpark
             }
-            else if (s.waiter == null) {
-                s.waiter = w;                 // request unpark then recheck
+                }                        // else signal in progress
             }
-            else if (timed) {
-                nanos = deadline - System.nanoTime();
-                if (nanos > 0L)
-                    LockSupport.parkNanos(this, nanos);
+            else if ((item = s.item) != e)
+                break;                   // recheck
+            else if (!timed) {
+                LockSupport.setCurrentBlocker(this);
+                try {
+                    ForkJoinPool.managedBlock(s);
+                } catch (InterruptedException cannotHappen) { }
+                LockSupport.setCurrentBlocker(null);
             }
             else {
-                LockSupport.park(this);
-            }
-        }
+                nanos = deadline - System.nanoTime();
+                if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
+                    LockSupport.parkNanos(this, nanos);
     }
-
-    /**
-     * Returns spin/yield value for a node with given predecessor and
-     * data mode. See above for explanation.
-     */
-    private static int spinsFor(Node pred, boolean haveData) {
-        if (MP && pred != null) {
-            if (pred.isData != haveData)      // phase change
-                return FRONT_SPINS + CHAINED_SPINS;
-            if (pred.isMatched())             // probably at front
-                return FRONT_SPINS;
-            if (pred.waiter == null)          // pred apparently spinning
-                return CHAINED_SPINS;
         }
-        return 0;
+        if (stat == 1)
+            WAITER.set(s, null);
+        if (!isData)
+            ITEM.set(s, s);              // self-link to avoid garbage
+        return (E) item;
     }
 
     /* -------------- Traversal methods -------------- */
 
     /**

@@ -1179,12 +1101,11 @@
         s.waiter = null; // disable signals
         /*
          * See above for rationale. Briefly: if pred still points to
          * s, try to unlink s.  If s cannot be unlinked, because it is
          * trailing node or pred might be unlinked, and neither pred
-         * nor s are head or offlist, add to sweepVotes, and if enough
-         * votes have accumulated, sweep.
+         * nor s are head or offlist, set needSweep;
          */
         if (pred != null && pred.next == s) {
             Node n = s.next;
             if (n == null ||
                 (n != s && pred.casNext(s, n) && pred.isMatched())) {

@@ -1198,23 +1119,22 @@
                     if (hn == null)
                         return;          // now empty
                     if (hn != h && casHead(h, hn))
                         h.selfLink();  // advance head
                 }
-                // sweep every SWEEP_THRESHOLD votes
-                if (pred.next != pred && s.next != s // recheck if offlist
-                    && (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
-                    sweep();
+                if (pred.next != pred && s.next != s)
+                    needSweep = true;
             }
         }
     }
 
     /**
      * Unlinks matched (typically cancelled) nodes encountered in a
      * traversal from head.
      */
     private void sweep() {
+        needSweep = false;
         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
             if (!s.isMatched())
                 // Unmatched nodes are never self-linked
                 p = s;
             else if ((n = s.next) == null) // trailing node is pinned

@@ -1263,11 +1183,11 @@
      * As the queue is unbounded, this method will never block.
      *
      * @throws NullPointerException if the specified element is null
      */
     public void put(E e) {
-        xfer(e, true, ASYNC, 0);
+        xfer(e, true, ASYNC, 0L);
     }
 
     /**
      * Inserts the specified element at the tail of this queue.
      * As the queue is unbounded, this method will never block or

@@ -1276,11 +1196,11 @@
      * @return {@code true} (as specified by
      *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
      * @throws NullPointerException if the specified element is null
      */
     public boolean offer(E e, long timeout, TimeUnit unit) {
-        xfer(e, true, ASYNC, 0);
+        xfer(e, true, ASYNC, 0L);
         return true;
     }
 
     /**
      * Inserts the specified element at the tail of this queue.

@@ -1288,11 +1208,11 @@
      *
      * @return {@code true} (as specified by {@link Queue#offer})
      * @throws NullPointerException if the specified element is null
      */
     public boolean offer(E e) {
-        xfer(e, true, ASYNC, 0);
+        xfer(e, true, ASYNC, 0L);
         return true;
     }
 
     /**
      * Inserts the specified element at the tail of this queue.

@@ -1301,11 +1221,11 @@
      *
      * @return {@code true} (as specified by {@link Collection#add})
      * @throws NullPointerException if the specified element is null
      */
     public boolean add(E e) {
-        xfer(e, true, ASYNC, 0);
+        xfer(e, true, ASYNC, 0L);
         return true;
     }
 
     /**
      * Transfers the element to a waiting consumer immediately, if possible.

@@ -1316,11 +1236,11 @@
      * otherwise returning {@code false} without enqueuing the element.
      *
      * @throws NullPointerException if the specified element is null
      */
     public boolean tryTransfer(E e) {
-        return xfer(e, true, NOW, 0) == null;
+        return xfer(e, true, NOW, 0L) == null;
     }
 
     /**
      * Transfers the element to a consumer, waiting if necessary to do so.
      *

@@ -1331,11 +1251,11 @@
      * and waits until the element is received by a consumer.
      *
      * @throws NullPointerException if the specified element is null
      */
     public void transfer(E e) throws InterruptedException {
-        if (xfer(e, true, SYNC, 0) != null) {
+        if (xfer(e, true, SYNC, 0L) != null) {
             Thread.interrupted(); // failure possible only due to interrupt
             throw new InterruptedException();
         }
     }
 

@@ -1361,11 +1281,11 @@
             return false;
         throw new InterruptedException();
     }
 
     public E take() throws InterruptedException {
-        E e = xfer(null, false, SYNC, 0);
+        E e = xfer(null, false, SYNC, 0L);
         if (e != null)
             return e;
         Thread.interrupted();
         throw new InterruptedException();
     }

@@ -1376,11 +1296,11 @@
             return e;
         throw new InterruptedException();
     }
 
     public E poll() {
-        return xfer(null, false, NOW, 0);
+        return xfer(null, false, NOW, 0L);
     }
 
     /**
      * @throws NullPointerException     {@inheritDoc}
      * @throws IllegalArgumentException {@inheritDoc}

@@ -1720,23 +1640,20 @@
     }
 
     // VarHandle mechanics
     private static final VarHandle HEAD;
     private static final VarHandle TAIL;
-    private static final VarHandle SWEEPVOTES;
     static final VarHandle ITEM;
     static final VarHandle NEXT;
     static final VarHandle WAITER;
     static {
         try {
             MethodHandles.Lookup l = MethodHandles.lookup();
             HEAD = l.findVarHandle(LinkedTransferQueue.class, "head",
                                    Node.class);
             TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
                                    Node.class);
-            SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
-                                         int.class);
             ITEM = l.findVarHandle(Node.class, "item", Object.class);
             NEXT = l.findVarHandle(Node.class, "next", Node.class);
             WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
         } catch (ReflectiveOperationException e) {
             throw new ExceptionInInitializerError(e);
< prev index next >