< prev index next >
src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java
Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin
@@ -164,10 +164,22 @@
* node's link to now point to the node itself. This doesn't arise
* much for Stack nodes (because blocked threads do not hang on to
* old head pointers), but references in Queue nodes must be
* aggressively forgotten to avoid reachability of everything any
* node has ever referred to since arrival.
+ *
+ * The above steps improve throughput when many threads produce
+ * and/or consume data. But they don't help much with
+ * single-source / single-sink usages in which one side or the
+ * other is always transiently blocked, and so throughput is
+ * mainly a function of thread scheduling. This is not usually
+ * noticeably improved with bounded short spin-waits. Instead both
+ * forms of transfer try Thread.yield if apparently the sole
+ * waiter. This works well when there are more tasks that cores,
+ * which is expected to be the main usage context of this mode. In
+ * other cases, waiters may help with some bookkeeping, then
+ * park/unpark.
*/
/**
* Shared internal API for dual stacks and queues.
*/
@@ -187,31 +199,14 @@
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
- * The number of times to spin before blocking in timed waits.
- * The value is empirically derived -- it works well across a
- * variety of processors and OSes. Empirically, the best value
- * seems not to vary with number of CPUs (beyond 2) so is just
- * a constant.
- */
- static final int MAX_TIMED_SPINS =
- (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
-
- /**
- * The number of times to spin before blocking in untimed waits.
- * This is greater than timed value because untimed waits spin
- * faster since they don't need to check times on each spin.
- */
- static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
-
- /**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
- static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
+ static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
/** Dual stack */
static final class TransferStack<E> extends Transferer<E> {
/*
* This extends Scherer-Scott dual stack algorithm, differing,
@@ -231,11 +226,11 @@
/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** Node class for TransferStacks. */
- static final class SNode {
+ static final class SNode implements ForkJoinPool.ManagedBlocker {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
@@ -259,41 +254,57 @@
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
- if (match == null &&
- SMATCH.compareAndSet(this, null, s)) {
- Thread w = waiter;
- if (w != null) { // waiters need at most one unpark
- waiter = null;
+ SNode m; Thread w;
+ if ((m = match) == null) {
+ if (SMATCH.compareAndSet(this, null, s)) {
+ if ((w = waiter) != null)
LockSupport.unpark(w);
- }
return true;
}
- return match == s;
+ else
+ m = match;
+ }
+ return m == s;
}
/**
* Tries to cancel a wait by matching node to itself.
*/
- void tryCancel() {
- SMATCH.compareAndSet(this, null, this);
+ boolean tryCancel() {
+ return SMATCH.compareAndSet(this, null, this);
}
boolean isCancelled() {
return match == this;
}
+ public final boolean isReleasable() {
+ return match != null || Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
+ }
+
+ void forgetWaiter() {
+ SWAITER.setOpaque(this, null);
+ }
+
// VarHandle mechanics
private static final VarHandle SMATCH;
private static final VarHandle SNEXT;
+ private static final VarHandle SWAITER;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
+ SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
@@ -356,18 +367,47 @@
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
- SNode m = awaitFulfill(s, timed, nanos);
- if (m == s) { // wait was cancelled
- clean(s);
+ long deadline = timed ? System.nanoTime() + nanos : 0L;
+ Thread w = Thread.currentThread();
+ int stat = -1; // -1: may yield, +1: park, else 0
+ SNode m; // await fulfill or cancel
+ while ((m = s.match) == null) {
+ if ((timed &&
+ (nanos = deadline - System.nanoTime()) <= 0) ||
+ w.isInterrupted()) {
+ if (s.tryCancel()) {
+ clean(s); // wait cancelled
return null;
}
- if ((h = head) != null && h.next == s)
- casHead(h, s.next); // help s's fulfiller
- return (E) ((mode == REQUEST) ? m.item : s.item);
+ } else if ((m = s.match) != null) {
+ break; // recheck
+ } else if (stat <= 0) {
+ if (stat < 0 && h == null && head == s) {
+ stat = 0; // yield once if was empty
+ Thread.yield();
+ } else {
+ stat = 1;
+ s.waiter = w; // enable signal
+ }
+ } else if (!timed) {
+ LockSupport.setCurrentBlocker(this);
+ try {
+ ForkJoinPool.managedBlock(s);
+ } catch (InterruptedException cannotHappen) { }
+ LockSupport.setCurrentBlocker(null);
+ } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
+ LockSupport.parkNanos(this, nanos);
+ }
+ if (stat == 1)
+ s.forgetWaiter();
+ Object result = (mode == REQUEST) ? m.item : s.item;
+ if (h != null && h.next == s)
+ casHead(h, s.next); // help fulfiller
+ return (E) result;
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
@@ -400,86 +440,15 @@
}
}
}
/**
- * Spins/blocks until node s is matched by a fulfill operation.
- *
- * @param s the waiting node
- * @param timed true if timed wait
- * @param nanos timeout value
- * @return matched node, or s if cancelled
- */
- SNode awaitFulfill(SNode s, boolean timed, long nanos) {
- /*
- * When a node/thread is about to block, it sets its waiter
- * field and then rechecks state at least one more time
- * before actually parking, thus covering race vs
- * fulfiller noticing that waiter is non-null so should be
- * woken.
- *
- * When invoked by nodes that appear at the point of call
- * to be at the head of the stack, calls to park are
- * preceded by spins to avoid blocking when producers and
- * consumers are arriving very close in time. This can
- * happen enough to bother only on multiprocessors.
- *
- * The order of checks for returning out of main loop
- * reflects fact that interrupts have precedence over
- * normal returns, which have precedence over
- * timeouts. (So, on timeout, one last check for match is
- * done before giving up.) Except that calls from untimed
- * SynchronousQueue.{poll/offer} don't check interrupts
- * and don't wait at all, so are trapped in transfer
- * method rather than calling awaitFulfill.
- */
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Thread w = Thread.currentThread();
- int spins = shouldSpin(s)
- ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
- : 0;
- for (;;) {
- if (w.isInterrupted())
- s.tryCancel();
- SNode m = s.match;
- if (m != null)
- return m;
- if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- s.tryCancel();
- continue;
- }
- }
- if (spins > 0) {
- Thread.onSpinWait();
- spins = shouldSpin(s) ? (spins - 1) : 0;
- }
- else if (s.waiter == null)
- s.waiter = w; // establish waiter so can park next iter
- else if (!timed)
- LockSupport.park(this);
- else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanos);
- }
- }
-
- /**
- * Returns true if node s is at head or there is an active
- * fulfiller.
- */
- boolean shouldSpin(SNode s) {
- SNode h = head;
- return (h == s || h == null || isFulfilling(h.mode));
- }
-
- /**
* Unlinks s from the stack.
*/
void clean(SNode s) {
s.item = null; // forget item
- s.waiter = null; // forget thread
+ s.forgetWaiter();
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
@@ -531,11 +500,11 @@
* nodes, and matching is done by CAS'ing QNode.item field
* from non-null to null (for put) or vice versa (for take).
*/
/** Node class for TransferQueue. */
- static final class QNode {
+ static final class QNode implements ForkJoinPool.ManagedBlocker {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
@@ -555,12 +524,12 @@
}
/**
* Tries to cancel by CAS'ing ref to this as item.
*/
- void tryCancel(Object cmp) {
- QITEM.compareAndSet(this, cmp, this);
+ boolean tryCancel(Object cmp) {
+ return QITEM.compareAndSet(this, cmp, this);
}
boolean isCancelled() {
return item == this;
}
@@ -572,18 +541,40 @@
*/
boolean isOffList() {
return next == this;
}
+ void forgetWaiter() {
+ QWAITER.setOpaque(this, null);
+ }
+
+ boolean isFulfilled() {
+ Object x;
+ return isData == ((x = item) == null) || x == this;
+ }
+
+ public final boolean isReleasable() {
+ Object x;
+ return isData == ((x = item) == null) || x == this ||
+ Thread.currentThread().isInterrupted();
+ }
+
+ public final boolean block() {
+ while (!isReleasable()) LockSupport.park();
+ return true;
+ }
+
// VarHandle mechanics
private static final VarHandle QITEM;
private static final VarHandle QNEXT;
+ private static final VarHandle QWAITER;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
QITEM = l.findVarHandle(QNode.class, "item", Object.class);
QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
+ QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}
@@ -661,114 +652,89 @@
* than having them implicitly interspersed.
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
-
for (;;) {
- QNode t = tail;
- QNode h = head;
- if (t == null || h == null) // saw uninitialized value
- continue; // spin
-
- if (h == t || t.isData == isData) { // empty or same-mode
- QNode tn = t.next;
- if (t != tail) // inconsistent read
- continue;
- if (tn != null) { // lagging tail
+ QNode t = tail, h = head, m, tn; // m is node to fulfill
+ if (t == null || h == null)
+ ; // inconsistent
+ else if (h == t || t.isData == isData) { // empty or same-mode
+ if (t != tail) // inconsistent
+ ;
+ else if ((tn = t.next) != null) // lagging tail
advanceTail(t, tn);
- continue;
- }
- if (timed && nanos <= 0L) // can't wait
+ else if (timed && nanos <= 0L) // can't wait
return null;
- if (s == null)
- s = new QNode(e, isData);
- if (!t.casNext(null, s)) // failed to link in
- continue;
-
- advanceTail(t, s); // swing tail and wait
- Object x = awaitFulfill(s, e, timed, nanos);
- if (x == s) { // wait was cancelled
+ else if (t.casNext(null, (s != null) ? s :
+ (s = new QNode(e, isData)))) {
+ advanceTail(t, s);
+ long deadline = timed ? System.nanoTime() + nanos : 0L;
+ Thread w = Thread.currentThread();
+ int stat = -1; // same idea as TransferStack
+ Object item;
+ while ((item = s.item) == e) {
+ if ((timed &&
+ (nanos = deadline - System.nanoTime()) <= 0) ||
+ w.isInterrupted()) {
+ if (s.tryCancel(e)) {
clean(t, s);
return null;
}
-
+ } else if ((item = s.item) != e) {
+ break; // recheck
+ } else if (stat <= 0) {
+ if (t.next == s) {
+ if (stat < 0 && t.isFulfilled()) {
+ stat = 0; // yield once if first
+ Thread.yield();
+ }
+ else {
+ stat = 1;
+ s.waiter = w;
+ }
+ }
+ } else if (!timed) {
+ LockSupport.setCurrentBlocker(this);
+ try {
+ ForkJoinPool.managedBlock(s);
+ } catch (InterruptedException cannotHappen) { }
+ LockSupport.setCurrentBlocker(null);
+ }
+ else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
+ LockSupport.parkNanos(this, nanos);
+ }
+ if (stat == 1)
+ s.forgetWaiter();
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
- if (x != null) // and forget fields
+ if (item != null) // and forget fields
s.item = s;
- s.waiter = null;
}
- return (x != null) ? (E)x : e;
-
- } else { // complementary-mode
- QNode m = h.next; // node to fulfill
- if (t != tail || m == null || h != head)
- continue; // inconsistent read
-
- Object x = m.item;
- if (isData == (x != null) || // m already fulfilled
- x == m || // m cancelled
- !m.casItem(x, e)) { // lost CAS
- advanceHead(h, m); // dequeue and retry
- continue;
+ return (item != null) ? (E)item : e;
}
- advanceHead(h, m); // successfully fulfilled
- LockSupport.unpark(m.waiter);
+ } else if ((m = h.next) != null && t == tail && h == head) {
+ Thread waiter;
+ Object x = m.item;
+ boolean fulfilled = ((isData == (x == null)) &&
+ x != m && m.casItem(x, e));
+ advanceHead(h, m); // (help) dequeue
+ if (fulfilled) {
+ if ((waiter = m.waiter) != null)
+ LockSupport.unpark(waiter);
return (x != null) ? (E)x : e;
}
}
}
-
- /**
- * Spins/blocks until node s is fulfilled.
- *
- * @param s the waiting node
- * @param e the comparison value for checking match
- * @param timed true if timed wait
- * @param nanos timeout value
- * @return matched item, or s if cancelled
- */
- Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
- /* Same idea as TransferStack.awaitFulfill */
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- Thread w = Thread.currentThread();
- int spins = (head.next == s)
- ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
- : 0;
- for (;;) {
- if (w.isInterrupted())
- s.tryCancel(e);
- Object x = s.item;
- if (x != e)
- return x;
- if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- s.tryCancel(e);
- continue;
- }
- }
- if (spins > 0) {
- --spins;
- Thread.onSpinWait();
- }
- else if (s.waiter == null)
- s.waiter = w;
- else if (!timed)
- LockSupport.park(this);
- else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
- LockSupport.parkNanos(this, nanos);
- }
}
/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
- s.waiter = null; // forget thread
+ s.forgetWaiter();
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
< prev index next >