< prev index next >

src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin

@@ -164,10 +164,22 @@
      * node's link to now point to the node itself. This doesn't arise
      * much for Stack nodes (because blocked threads do not hang on to
      * old head pointers), but references in Queue nodes must be
      * aggressively forgotten to avoid reachability of everything any
      * node has ever referred to since arrival.
+     *
+     * The above steps improve throughput when many threads produce
+     * and/or consume data. But they don't help much with
+     * single-source / single-sink usages in which one side or the
+     * other is always transiently blocked, and so throughput is
+     * mainly a function of thread scheduling. This is not usually
+     * noticeably improved with bounded short spin-waits. Instead both
+     * forms of transfer try Thread.yield if apparently the sole
+     * waiter. This works well when there are more tasks that cores,
+     * which is expected to be the main usage context of this mode. In
+     * other cases, waiters may help with some bookkeeping, then
+     * park/unpark.
      */
 
     /**
      * Shared internal API for dual stacks and queues.
      */

@@ -187,31 +199,14 @@
          */
         abstract E transfer(E e, boolean timed, long nanos);
     }
 
     /**
-     * The number of times to spin before blocking in timed waits.
-     * The value is empirically derived -- it works well across a
-     * variety of processors and OSes. Empirically, the best value
-     * seems not to vary with number of CPUs (beyond 2) so is just
-     * a constant.
-     */
-    static final int MAX_TIMED_SPINS =
-        (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
-
-    /**
-     * The number of times to spin before blocking in untimed waits.
-     * This is greater than timed value because untimed waits spin
-     * faster since they don't need to check times on each spin.
-     */
-    static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
-
-    /**
      * The number of nanoseconds for which it is faster to spin
      * rather than to use timed park. A rough estimate suffices.
      */
-    static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
+    static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
 
     /** Dual stack */
     static final class TransferStack<E> extends Transferer<E> {
         /*
          * This extends Scherer-Scott dual stack algorithm, differing,

@@ -231,11 +226,11 @@
 
         /** Returns true if m has fulfilling bit set. */
         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 
         /** Node class for TransferStacks. */
-        static final class SNode {
+        static final class SNode implements ForkJoinPool.ManagedBlocker {
             volatile SNode next;        // next node in stack
             volatile SNode match;       // the node matched to this
             volatile Thread waiter;     // to control park/unpark
             Object item;                // data; or null for REQUESTs
             int mode;

@@ -259,41 +254,57 @@
              *
              * @param s the node to match
              * @return true if successfully matched to s
              */
             boolean tryMatch(SNode s) {
-                if (match == null &&
-                    SMATCH.compareAndSet(this, null, s)) {
-                    Thread w = waiter;
-                    if (w != null) {    // waiters need at most one unpark
-                        waiter = null;
+                SNode m; Thread w;
+                if ((m = match) == null) {
+                    if (SMATCH.compareAndSet(this, null, s)) {
+                        if ((w = waiter) != null)
                         LockSupport.unpark(w);
-                    }
                     return true;
                 }
-                return match == s;
+                    else
+                        m = match;
+                }
+                return m == s;
             }
 
             /**
              * Tries to cancel a wait by matching node to itself.
              */
-            void tryCancel() {
-                SMATCH.compareAndSet(this, null, this);
+            boolean tryCancel() {
+                return SMATCH.compareAndSet(this, null, this);
             }
 
             boolean isCancelled() {
                 return match == this;
             }
 
+            public final boolean isReleasable() {
+                return match != null || Thread.currentThread().isInterrupted();
+            }
+
+            public final boolean block() {
+                while (!isReleasable()) LockSupport.park();
+                return true;
+            }
+
+            void forgetWaiter() {
+                SWAITER.setOpaque(this, null);
+            }
+
             // VarHandle mechanics
             private static final VarHandle SMATCH;
             private static final VarHandle SNEXT;
+            private static final VarHandle SWAITER;
             static {
                 try {
                     MethodHandles.Lookup l = MethodHandles.lookup();
                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
+                    SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
                 } catch (ReflectiveOperationException e) {
                     throw new ExceptionInInitializerError(e);
                 }
             }
         }

@@ -356,18 +367,47 @@
                         if (h != null && h.isCancelled())
                             casHead(h, h.next);     // pop cancelled node
                         else
                             return null;
                     } else if (casHead(h, s = snode(s, e, h, mode))) {
-                        SNode m = awaitFulfill(s, timed, nanos);
-                        if (m == s) {               // wait was cancelled
-                            clean(s);
+                        long deadline = timed ? System.nanoTime() + nanos : 0L;
+                        Thread w = Thread.currentThread();
+                        int stat = -1; // -1: may yield, +1: park, else 0
+                        SNode m;                    // await fulfill or cancel
+                        while ((m = s.match) == null) {
+                            if ((timed &&
+                                 (nanos = deadline - System.nanoTime()) <= 0) ||
+                                w.isInterrupted()) {
+                                if (s.tryCancel()) {
+                                    clean(s);       // wait cancelled
                             return null;
                         }
-                        if ((h = head) != null && h.next == s)
-                            casHead(h, s.next);     // help s's fulfiller
-                        return (E) ((mode == REQUEST) ? m.item : s.item);
+                            } else if ((m = s.match) != null) {
+                                break;              // recheck
+                            } else if (stat <= 0) {
+                                if (stat < 0 && h == null && head == s) {
+                                    stat = 0;       // yield once if was empty
+                                    Thread.yield();
+                                } else {
+                                    stat = 1;
+                                    s.waiter = w;   // enable signal
+                                }
+                            } else if (!timed) {
+                                LockSupport.setCurrentBlocker(this);
+                                try {
+                                    ForkJoinPool.managedBlock(s);
+                                } catch (InterruptedException cannotHappen) { }
+                                LockSupport.setCurrentBlocker(null);
+                            } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
+                                LockSupport.parkNanos(this, nanos);
+                        }
+                        if (stat == 1)
+                            s.forgetWaiter();
+                        Object result = (mode == REQUEST) ? m.item : s.item;
+                        if (h != null && h.next == s)
+                            casHead(h, s.next);     // help fulfiller
+                        return (E) result;
                     }
                 } else if (!isFulfilling(h.mode)) { // try to fulfill
                     if (h.isCancelled())            // already cancelled
                         casHead(h, h.next);         // pop and retry
                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

@@ -400,86 +440,15 @@
                 }
             }
         }
 
         /**
-         * Spins/blocks until node s is matched by a fulfill operation.
-         *
-         * @param s the waiting node
-         * @param timed true if timed wait
-         * @param nanos timeout value
-         * @return matched node, or s if cancelled
-         */
-        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
-            /*
-             * When a node/thread is about to block, it sets its waiter
-             * field and then rechecks state at least one more time
-             * before actually parking, thus covering race vs
-             * fulfiller noticing that waiter is non-null so should be
-             * woken.
-             *
-             * When invoked by nodes that appear at the point of call
-             * to be at the head of the stack, calls to park are
-             * preceded by spins to avoid blocking when producers and
-             * consumers are arriving very close in time.  This can
-             * happen enough to bother only on multiprocessors.
-             *
-             * The order of checks for returning out of main loop
-             * reflects fact that interrupts have precedence over
-             * normal returns, which have precedence over
-             * timeouts. (So, on timeout, one last check for match is
-             * done before giving up.) Except that calls from untimed
-             * SynchronousQueue.{poll/offer} don't check interrupts
-             * and don't wait at all, so are trapped in transfer
-             * method rather than calling awaitFulfill.
-             */
-            final long deadline = timed ? System.nanoTime() + nanos : 0L;
-            Thread w = Thread.currentThread();
-            int spins = shouldSpin(s)
-                ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
-                : 0;
-            for (;;) {
-                if (w.isInterrupted())
-                    s.tryCancel();
-                SNode m = s.match;
-                if (m != null)
-                    return m;
-                if (timed) {
-                    nanos = deadline - System.nanoTime();
-                    if (nanos <= 0L) {
-                        s.tryCancel();
-                        continue;
-                    }
-                }
-                if (spins > 0) {
-                    Thread.onSpinWait();
-                    spins = shouldSpin(s) ? (spins - 1) : 0;
-                }
-                else if (s.waiter == null)
-                    s.waiter = w; // establish waiter so can park next iter
-                else if (!timed)
-                    LockSupport.park(this);
-                else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
-                    LockSupport.parkNanos(this, nanos);
-            }
-        }
-
-        /**
-         * Returns true if node s is at head or there is an active
-         * fulfiller.
-         */
-        boolean shouldSpin(SNode s) {
-            SNode h = head;
-            return (h == s || h == null || isFulfilling(h.mode));
-        }
-
-        /**
          * Unlinks s from the stack.
          */
         void clean(SNode s) {
             s.item = null;   // forget item
-            s.waiter = null; // forget thread
+            s.forgetWaiter();
 
             /*
              * At worst we may need to traverse entire stack to unlink
              * s. If there are multiple concurrent calls to clean, we
              * might not see s if another thread has already removed

@@ -531,11 +500,11 @@
          * nodes, and matching is done by CAS'ing QNode.item field
          * from non-null to null (for put) or vice versa (for take).
          */
 
         /** Node class for TransferQueue. */
-        static final class QNode {
+        static final class QNode implements ForkJoinPool.ManagedBlocker {
             volatile QNode next;          // next node in queue
             volatile Object item;         // CAS'ed to or from null
             volatile Thread waiter;       // to control park/unpark
             final boolean isData;
 

@@ -555,12 +524,12 @@
             }
 
             /**
              * Tries to cancel by CAS'ing ref to this as item.
              */
-            void tryCancel(Object cmp) {
-                QITEM.compareAndSet(this, cmp, this);
+            boolean tryCancel(Object cmp) {
+                return QITEM.compareAndSet(this, cmp, this);
             }
 
             boolean isCancelled() {
                 return item == this;
             }

@@ -572,18 +541,40 @@
              */
             boolean isOffList() {
                 return next == this;
             }
 
+            void forgetWaiter() {
+                QWAITER.setOpaque(this, null);
+            }
+
+            boolean isFulfilled() {
+                Object x;
+                return isData == ((x = item) == null) || x == this;
+            }
+
+            public final boolean isReleasable() {
+                Object x;
+                return isData == ((x = item) == null) || x == this ||
+                    Thread.currentThread().isInterrupted();
+            }
+
+            public final boolean block() {
+                while (!isReleasable()) LockSupport.park();
+                return true;
+            }
+
             // VarHandle mechanics
             private static final VarHandle QITEM;
             private static final VarHandle QNEXT;
+            private static final VarHandle QWAITER;
             static {
                 try {
                     MethodHandles.Lookup l = MethodHandles.lookup();
                     QITEM = l.findVarHandle(QNode.class, "item", Object.class);
                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
+                    QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
                 } catch (ReflectiveOperationException e) {
                     throw new ExceptionInInitializerError(e);
                 }
             }
         }

@@ -661,114 +652,89 @@
              * than having them implicitly interspersed.
              */
 
             QNode s = null; // constructed/reused as needed
             boolean isData = (e != null);
-
             for (;;) {
-                QNode t = tail;
-                QNode h = head;
-                if (t == null || h == null)         // saw uninitialized value
-                    continue;                       // spin
-
-                if (h == t || t.isData == isData) { // empty or same-mode
-                    QNode tn = t.next;
-                    if (t != tail)                  // inconsistent read
-                        continue;
-                    if (tn != null) {               // lagging tail
+                QNode t = tail, h = head, m, tn;         // m is node to fulfill
+                if (t == null || h == null)
+                    ;                                    // inconsistent
+                else if (h == t || t.isData == isData) { // empty or same-mode
+                    if (t != tail)                       // inconsistent
+                        ;
+                    else if ((tn = t.next) != null)      // lagging tail
                         advanceTail(t, tn);
-                        continue;
-                    }
-                    if (timed && nanos <= 0L)       // can't wait
+                    else if (timed && nanos <= 0L)       // can't wait
                         return null;
-                    if (s == null)
-                        s = new QNode(e, isData);
-                    if (!t.casNext(null, s))        // failed to link in
-                        continue;
-
-                    advanceTail(t, s);              // swing tail and wait
-                    Object x = awaitFulfill(s, e, timed, nanos);
-                    if (x == s) {                   // wait was cancelled
+                    else if (t.casNext(null, (s != null) ? s :
+                                       (s = new QNode(e, isData)))) {
+                        advanceTail(t, s);
+                        long deadline = timed ? System.nanoTime() + nanos : 0L;
+                        Thread w = Thread.currentThread();
+                        int stat = -1; // same idea as TransferStack
+                        Object item;
+                        while ((item = s.item) == e) {
+                            if ((timed &&
+                                 (nanos = deadline - System.nanoTime()) <= 0) ||
+                                w.isInterrupted()) {
+                                if (s.tryCancel(e)) {
                         clean(t, s);
                         return null;
                     }
-
+                            } else if ((item = s.item) != e) {
+                                break;                   // recheck
+                            } else if (stat <= 0) {
+                                if (t.next == s) {
+                                    if (stat < 0 && t.isFulfilled()) {
+                                        stat = 0;        // yield once if first
+                                        Thread.yield();
+                                    }
+                                    else {
+                                        stat = 1;
+                                        s.waiter = w;
+                                    }
+                                }
+                            } else if (!timed) {
+                                LockSupport.setCurrentBlocker(this);
+                                try {
+                                    ForkJoinPool.managedBlock(s);
+                                } catch (InterruptedException cannotHappen) { }
+                                LockSupport.setCurrentBlocker(null);
+                            }
+                            else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
+                                LockSupport.parkNanos(this, nanos);
+                        }
+                        if (stat == 1)
+                            s.forgetWaiter();
                     if (!s.isOffList()) {           // not already unlinked
                         advanceHead(t, s);          // unlink if head
-                        if (x != null)              // and forget fields
+                            if (item != null)            // and forget fields
                             s.item = s;
-                        s.waiter = null;
                     }
-                    return (x != null) ? (E)x : e;
-
-                } else {                            // complementary-mode
-                    QNode m = h.next;               // node to fulfill
-                    if (t != tail || m == null || h != head)
-                        continue;                   // inconsistent read
-
-                    Object x = m.item;
-                    if (isData == (x != null) ||    // m already fulfilled
-                        x == m ||                   // m cancelled
-                        !m.casItem(x, e)) {         // lost CAS
-                        advanceHead(h, m);          // dequeue and retry
-                        continue;
+                        return (item != null) ? (E)item : e;
                     }
 
-                    advanceHead(h, m);              // successfully fulfilled
-                    LockSupport.unpark(m.waiter);
+                } else if ((m = h.next) != null && t == tail && h == head) {
+                    Thread waiter;
+                    Object x = m.item;
+                    boolean fulfilled = ((isData == (x == null)) &&
+                                         x != m && m.casItem(x, e));
+                    advanceHead(h, m);                    // (help) dequeue
+                    if (fulfilled) {
+                        if ((waiter = m.waiter) != null)
+                            LockSupport.unpark(waiter);
                     return (x != null) ? (E)x : e;
                 }
             }
         }
-
-        /**
-         * Spins/blocks until node s is fulfilled.
-         *
-         * @param s the waiting node
-         * @param e the comparison value for checking match
-         * @param timed true if timed wait
-         * @param nanos timeout value
-         * @return matched item, or s if cancelled
-         */
-        Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
-            /* Same idea as TransferStack.awaitFulfill */
-            final long deadline = timed ? System.nanoTime() + nanos : 0L;
-            Thread w = Thread.currentThread();
-            int spins = (head.next == s)
-                ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
-                : 0;
-            for (;;) {
-                if (w.isInterrupted())
-                    s.tryCancel(e);
-                Object x = s.item;
-                if (x != e)
-                    return x;
-                if (timed) {
-                    nanos = deadline - System.nanoTime();
-                    if (nanos <= 0L) {
-                        s.tryCancel(e);
-                        continue;
-                    }
-                }
-                if (spins > 0) {
-                    --spins;
-                    Thread.onSpinWait();
-                }
-                else if (s.waiter == null)
-                    s.waiter = w;
-                else if (!timed)
-                    LockSupport.park(this);
-                else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
-                    LockSupport.parkNanos(this, nanos);
-            }
         }
 
         /**
          * Gets rid of cancelled node s with original predecessor pred.
          */
         void clean(QNode pred, QNode s) {
-            s.waiter = null; // forget thread
+            s.forgetWaiter();
             /*
              * At any given time, exactly one node on list cannot be
              * deleted -- the last inserted node. To accommodate this,
              * if we cannot delete s, we save its predecessor as
              * "cleanMe", deleting the previously saved version
< prev index next >