149 * Cleaning is done in different ways in queues vs stacks. For
150 * queues, we can almost always remove a node immediately in O(1)
151 * time (modulo retries for consistency checks) when it is
152 * cancelled. But if it may be pinned as the current tail, it must
153 * wait until some subsequent cancellation. For stacks, we need a
154 * potentially O(n) traversal to be sure that we can remove the
155 * node, but this can run concurrently with other threads
156 * accessing the stack.
157 *
158 * While garbage collection takes care of most node reclamation
159 * issues that otherwise complicate nonblocking algorithms, care
160 * is taken to "forget" references to data, other nodes, and
161 * threads that might be held on to long-term by blocked
162 * threads. In cases where setting to null would otherwise
163 * conflict with main algorithms, this is done by changing a
164 * node's link to now point to the node itself. This doesn't arise
165 * much for Stack nodes (because blocked threads do not hang on to
166 * old head pointers), but references in Queue nodes must be
167 * aggressively forgotten to avoid reachability of everything any
168 * node has ever referred to since arrival.
169 */
170
171 /**
172 * Shared internal API for dual stacks and queues.
173 */
174 abstract static class Transferer<E> {
175 /**
176 * Performs a put or take.
177 *
178 * @param e if non-null, the item to be handed to a consumer;
179 * if null, requests that transfer return an item
180 * offered by producer.
181 * @param timed if this operation should timeout
182 * @param nanos the timeout, in nanoseconds
183 * @return if non-null, the item provided or received; if null,
184 * the operation failed due to timeout or interrupt --
185 * the caller can distinguish which of these occurred
186 * by checking Thread.interrupted.
187 */
188 abstract E transfer(E e, boolean timed, long nanos);
189 }
190
191 /**
192 * The number of times to spin before blocking in timed waits.
193 * The value is empirically derived -- it works well across a
194 * variety of processors and OSes. Empirically, the best value
195 * seems not to vary with number of CPUs (beyond 2) so is just
196 * a constant.
197 */
198 static final int MAX_TIMED_SPINS =
199 (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
200
201 /**
202 * The number of times to spin before blocking in untimed waits.
203 * This is greater than timed value because untimed waits spin
204 * faster since they don't need to check times on each spin.
205 */
206 static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
207
208 /**
209 * The number of nanoseconds for which it is faster to spin
210 * rather than to use timed park. A rough estimate suffices.
211 */
212 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
213
214 /** Dual stack */
215 static final class TransferStack<E> extends Transferer<E> {
216 /*
217 * This extends Scherer-Scott dual stack algorithm, differing,
218 * among other ways, by using "covering" nodes rather than
219 * bit-marked pointers: Fulfilling operations push on marker
220 * nodes (with FULFILLING bit set in mode) to reserve a spot
221 * to match a waiting node.
222 */
223
224 /* Modes for SNodes, ORed together in node fields */
225 /** Node represents an unfulfilled consumer */
226 static final int REQUEST = 0;
227 /** Node represents an unfulfilled producer */
228 static final int DATA = 1;
229 /** Node is fulfilling another unfulfilled DATA or REQUEST */
230 static final int FULFILLING = 2;
231
232 /** Returns true if m has fulfilling bit set. */
233 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
234
235 /** Node class for TransferStacks. */
236 static final class SNode {
237 volatile SNode next; // next node in stack
238 volatile SNode match; // the node matched to this
239 volatile Thread waiter; // to control park/unpark
240 Object item; // data; or null for REQUESTs
241 int mode;
242 // Note: item and mode fields don't need to be volatile
243 // since they are always written before, and read after,
244 // other volatile/atomic operations.
245
246 SNode(Object item) {
247 this.item = item;
248 }
249
250 boolean casNext(SNode cmp, SNode val) {
251 return cmp == next &&
252 SNEXT.compareAndSet(this, cmp, val);
253 }
254
255 /**
256 * Tries to match node s to this node, if so, waking up thread.
257 * Fulfillers call tryMatch to identify their waiters.
258 * Waiters block until they have been matched.
259 *
260 * @param s the node to match
261 * @return true if successfully matched to s
262 */
263 boolean tryMatch(SNode s) {
264 if (match == null &&
265 SMATCH.compareAndSet(this, null, s)) {
266 Thread w = waiter;
267 if (w != null) { // waiters need at most one unpark
268 waiter = null;
269 LockSupport.unpark(w);
270 }
271 return true;
272 }
273 return match == s;
274 }
275
276 /**
277 * Tries to cancel a wait by matching node to itself.
278 */
279 void tryCancel() {
280 SMATCH.compareAndSet(this, null, this);
281 }
282
283 boolean isCancelled() {
284 return match == this;
285 }
286
287 // VarHandle mechanics
288 private static final VarHandle SMATCH;
289 private static final VarHandle SNEXT;
290 static {
291 try {
292 MethodHandles.Lookup l = MethodHandles.lookup();
293 SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
294 SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
295 } catch (ReflectiveOperationException e) {
296 throw new ExceptionInInitializerError(e);
297 }
298 }
299 }
300
301 /** The head (top) of the stack */
302 volatile SNode head;
303
304 boolean casHead(SNode h, SNode nh) {
305 return h == head &&
306 SHEAD.compareAndSet(this, h, nh);
307 }
308
309 /**
310 * Creates or resets fields of a node. Called only from transfer
311 * where the node to push on stack is lazily created and
312 * reused when possible to help reduce intervals between reads
313 * and CASes of head and to avoid surges of garbage when CASes
314 * to push nodes fail due to contention.
341 *
342 * 3. If top of stack already holds another fulfilling node,
343 * help it out by doing its match and/or pop
344 * operations, and then continue. The code for helping
345 * is essentially the same as for fulfilling, except
346 * that it doesn't return the item.
347 */
348
349 SNode s = null; // constructed/reused as needed
350 int mode = (e == null) ? REQUEST : DATA;
351
352 for (;;) {
353 SNode h = head;
354 if (h == null || h.mode == mode) { // empty or same-mode
355 if (timed && nanos <= 0L) { // can't wait
356 if (h != null && h.isCancelled())
357 casHead(h, h.next); // pop cancelled node
358 else
359 return null;
360 } else if (casHead(h, s = snode(s, e, h, mode))) {
361 SNode m = awaitFulfill(s, timed, nanos);
362 if (m == s) { // wait was cancelled
363 clean(s);
364 return null;
365 }
366 if ((h = head) != null && h.next == s)
367 casHead(h, s.next); // help s's fulfiller
368 return (E) ((mode == REQUEST) ? m.item : s.item);
369 }
370 } else if (!isFulfilling(h.mode)) { // try to fulfill
371 if (h.isCancelled()) // already cancelled
372 casHead(h, h.next); // pop and retry
373 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
374 for (;;) { // loop until matched or waiters disappear
375 SNode m = s.next; // m is s's match
376 if (m == null) { // all waiters are gone
377 casHead(s, null); // pop fulfill node
378 s = null; // use new node next time
379 break; // restart main loop
380 }
381 SNode mn = m.next;
382 if (m.tryMatch(s)) {
383 casHead(s, mn); // pop both s and m
384 return (E) ((mode == REQUEST) ? m.item : s.item);
385 } else // lost match
386 s.casNext(m, mn); // help unlink
387 }
388 }
389 } else { // help a fulfiller
390 SNode m = h.next; // m is h's match
391 if (m == null) // waiter is gone
392 casHead(h, null); // pop fulfilling node
393 else {
394 SNode mn = m.next;
395 if (m.tryMatch(h)) // help match
396 casHead(h, mn); // pop both h and m
397 else // lost match
398 h.casNext(m, mn); // help unlink
399 }
400 }
401 }
402 }
403
404 /**
405 * Spins/blocks until node s is matched by a fulfill operation.
406 *
407 * @param s the waiting node
408 * @param timed true if timed wait
409 * @param nanos timeout value
410 * @return matched node, or s if cancelled
411 */
412 SNode awaitFulfill(SNode s, boolean timed, long nanos) {
413 /*
414 * When a node/thread is about to block, it sets its waiter
415 * field and then rechecks state at least one more time
416 * before actually parking, thus covering race vs
417 * fulfiller noticing that waiter is non-null so should be
418 * woken.
419 *
420 * When invoked by nodes that appear at the point of call
421 * to be at the head of the stack, calls to park are
422 * preceded by spins to avoid blocking when producers and
423 * consumers are arriving very close in time. This can
424 * happen enough to bother only on multiprocessors.
425 *
426 * The order of checks for returning out of main loop
427 * reflects fact that interrupts have precedence over
428 * normal returns, which have precedence over
429 * timeouts. (So, on timeout, one last check for match is
430 * done before giving up.) Except that calls from untimed
431 * SynchronousQueue.{poll/offer} don't check interrupts
432 * and don't wait at all, so are trapped in transfer
433 * method rather than calling awaitFulfill.
434 */
435 final long deadline = timed ? System.nanoTime() + nanos : 0L;
436 Thread w = Thread.currentThread();
437 int spins = shouldSpin(s)
438 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
439 : 0;
440 for (;;) {
441 if (w.isInterrupted())
442 s.tryCancel();
443 SNode m = s.match;
444 if (m != null)
445 return m;
446 if (timed) {
447 nanos = deadline - System.nanoTime();
448 if (nanos <= 0L) {
449 s.tryCancel();
450 continue;
451 }
452 }
453 if (spins > 0) {
454 Thread.onSpinWait();
455 spins = shouldSpin(s) ? (spins - 1) : 0;
456 }
457 else if (s.waiter == null)
458 s.waiter = w; // establish waiter so can park next iter
459 else if (!timed)
460 LockSupport.park(this);
461 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
462 LockSupport.parkNanos(this, nanos);
463 }
464 }
465
466 /**
467 * Returns true if node s is at head or there is an active
468 * fulfiller.
469 */
470 boolean shouldSpin(SNode s) {
471 SNode h = head;
472 return (h == s || h == null || isFulfilling(h.mode));
473 }
474
475 /**
476 * Unlinks s from the stack.
477 */
478 void clean(SNode s) {
479 s.item = null; // forget item
480 s.waiter = null; // forget thread
481
482 /*
483 * At worst we may need to traverse entire stack to unlink
484 * s. If there are multiple concurrent calls to clean, we
485 * might not see s if another thread has already removed
486 * it. But we can stop when we see any node known to
487 * follow s. We use s.next unless it too is cancelled, in
488 * which case we try the node one past. We don't check any
489 * further because we don't want to doubly traverse just to
490 * find sentinel.
491 */
492
493 SNode past = s.next;
494 if (past != null && past.isCancelled())
495 past = past.next;
496
497 // Absorb cancelled nodes at head
498 SNode p;
499 while ((p = head) != null && p != past && p.isCancelled())
500 casHead(p, p.next);
516 MethodHandles.Lookup l = MethodHandles.lookup();
517 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
518 } catch (ReflectiveOperationException e) {
519 throw new ExceptionInInitializerError(e);
520 }
521 }
522 }
523
524 /** Dual Queue */
525 static final class TransferQueue<E> extends Transferer<E> {
526 /*
527 * This extends Scherer-Scott dual queue algorithm, differing,
528 * among other ways, by using modes within nodes rather than
529 * marked pointers. The algorithm is a little simpler than
530 * that for stacks because fulfillers do not need explicit
531 * nodes, and matching is done by CAS'ing QNode.item field
532 * from non-null to null (for put) or vice versa (for take).
533 */
534
535 /** Node class for TransferQueue. */
536 static final class QNode {
537 volatile QNode next; // next node in queue
538 volatile Object item; // CAS'ed to or from null
539 volatile Thread waiter; // to control park/unpark
540 final boolean isData;
541
542 QNode(Object item, boolean isData) {
543 this.item = item;
544 this.isData = isData;
545 }
546
547 boolean casNext(QNode cmp, QNode val) {
548 return next == cmp &&
549 QNEXT.compareAndSet(this, cmp, val);
550 }
551
552 boolean casItem(Object cmp, Object val) {
553 return item == cmp &&
554 QITEM.compareAndSet(this, cmp, val);
555 }
556
557 /**
558 * Tries to cancel by CAS'ing ref to this as item.
559 */
560 void tryCancel(Object cmp) {
561 QITEM.compareAndSet(this, cmp, this);
562 }
563
564 boolean isCancelled() {
565 return item == this;
566 }
567
568 /**
569 * Returns true if this node is known to be off the queue
570 * because its next pointer has been forgotten due to
571 * an advanceHead operation.
572 */
573 boolean isOffList() {
574 return next == this;
575 }
576
577 // VarHandle mechanics
578 private static final VarHandle QITEM;
579 private static final VarHandle QNEXT;
580 static {
581 try {
582 MethodHandles.Lookup l = MethodHandles.lookup();
583 QITEM = l.findVarHandle(QNode.class, "item", Object.class);
584 QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
585 } catch (ReflectiveOperationException e) {
586 throw new ExceptionInInitializerError(e);
587 }
588 }
589 }
590
591 /** Head of queue */
592 transient volatile QNode head;
593 /** Tail of queue */
594 transient volatile QNode tail;
595 /**
596 * Reference to a cancelled node that might not yet have been
597 * unlinked from queue because it was the last inserted node
598 * when it was cancelled.
599 */
600 transient volatile QNode cleanMe;
601
602 TransferQueue() {
603 QNode h = new QNode(null, false); // initialize to dummy node.
604 head = h;
646 * 2. If queue apparently contains waiting items, and this
647 * call is of complementary mode, try to fulfill by CAS'ing
648 * item field of waiting node and dequeuing it, and then
649 * returning matching item.
650 *
651 * In each case, along the way, check for and try to help
652 * advance head and tail on behalf of other stalled/slow
653 * threads.
654 *
655 * The loop starts off with a null check guarding against
656 * seeing uninitialized head or tail values. This never
657 * happens in current SynchronousQueue, but could if
658 * callers held non-volatile/final ref to the
659 * transferer. The check is here anyway because it places
660 * null checks at top of loop, which is usually faster
661 * than having them implicitly interspersed.
662 */
663
664 QNode s = null; // constructed/reused as needed
665 boolean isData = (e != null);
666
667 for (;;) {
668 QNode t = tail;
669 QNode h = head;
670 if (t == null || h == null) // saw uninitialized value
671 continue; // spin
672
673 if (h == t || t.isData == isData) { // empty or same-mode
674 QNode tn = t.next;
675 if (t != tail) // inconsistent read
676 continue;
677 if (tn != null) { // lagging tail
678 advanceTail(t, tn);
679 continue;
680 }
681 if (timed && nanos <= 0L) // can't wait
682 return null;
683 if (s == null)
684 s = new QNode(e, isData);
685 if (!t.casNext(null, s)) // failed to link in
686 continue;
687
688 advanceTail(t, s); // swing tail and wait
689 Object x = awaitFulfill(s, e, timed, nanos);
690 if (x == s) { // wait was cancelled
691 clean(t, s);
692 return null;
693 }
694
695 if (!s.isOffList()) { // not already unlinked
696 advanceHead(t, s); // unlink if head
697 if (x != null) // and forget fields
698 s.item = s;
699 s.waiter = null;
700 }
701 return (x != null) ? (E)x : e;
702
703 } else { // complementary-mode
704 QNode m = h.next; // node to fulfill
705 if (t != tail || m == null || h != head)
706 continue; // inconsistent read
707
708 Object x = m.item;
709 if (isData == (x != null) || // m already fulfilled
710 x == m || // m cancelled
711 !m.casItem(x, e)) { // lost CAS
712 advanceHead(h, m); // dequeue and retry
713 continue;
714 }
715
716 advanceHead(h, m); // successfully fulfilled
717 LockSupport.unpark(m.waiter);
718 return (x != null) ? (E)x : e;
719 }
720 }
721 }
722
723 /**
724 * Spins/blocks until node s is fulfilled.
725 *
726 * @param s the waiting node
727 * @param e the comparison value for checking match
728 * @param timed true if timed wait
729 * @param nanos timeout value
730 * @return matched item, or s if cancelled
731 */
732 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
733 /* Same idea as TransferStack.awaitFulfill */
734 final long deadline = timed ? System.nanoTime() + nanos : 0L;
735 Thread w = Thread.currentThread();
736 int spins = (head.next == s)
737 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
738 : 0;
739 for (;;) {
740 if (w.isInterrupted())
741 s.tryCancel(e);
742 Object x = s.item;
743 if (x != e)
744 return x;
745 if (timed) {
746 nanos = deadline - System.nanoTime();
747 if (nanos <= 0L) {
748 s.tryCancel(e);
749 continue;
750 }
751 }
752 if (spins > 0) {
753 --spins;
754 Thread.onSpinWait();
755 }
756 else if (s.waiter == null)
757 s.waiter = w;
758 else if (!timed)
759 LockSupport.park(this);
760 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
761 LockSupport.parkNanos(this, nanos);
762 }
763 }
764
765 /**
766 * Gets rid of cancelled node s with original predecessor pred.
767 */
768 void clean(QNode pred, QNode s) {
769 s.waiter = null; // forget thread
770 /*
771 * At any given time, exactly one node on list cannot be
772 * deleted -- the last inserted node. To accommodate this,
773 * if we cannot delete s, we save its predecessor as
774 * "cleanMe", deleting the previously saved version
775 * first. At least one of node s or the node previously
776 * saved can always be deleted, so this always terminates.
777 */
778 while (pred.next == s) { // Return early if already unlinked
779 QNode h = head;
780 QNode hn = h.next; // Absorb cancelled first node as head
781 if (hn != null && hn.isCancelled()) {
782 advanceHead(h, hn);
783 continue;
784 }
785 QNode t = tail; // Ensure consistent read for tail
786 if (t == h)
787 return;
788 QNode tn = t.next;
789 if (t != tail)
|
149 * Cleaning is done in different ways in queues vs stacks. For
150 * queues, we can almost always remove a node immediately in O(1)
151 * time (modulo retries for consistency checks) when it is
152 * cancelled. But if it may be pinned as the current tail, it must
153 * wait until some subsequent cancellation. For stacks, we need a
154 * potentially O(n) traversal to be sure that we can remove the
155 * node, but this can run concurrently with other threads
156 * accessing the stack.
157 *
158 * While garbage collection takes care of most node reclamation
159 * issues that otherwise complicate nonblocking algorithms, care
160 * is taken to "forget" references to data, other nodes, and
161 * threads that might be held on to long-term by blocked
162 * threads. In cases where setting to null would otherwise
163 * conflict with main algorithms, this is done by changing a
164 * node's link to now point to the node itself. This doesn't arise
165 * much for Stack nodes (because blocked threads do not hang on to
166 * old head pointers), but references in Queue nodes must be
167 * aggressively forgotten to avoid reachability of everything any
168 * node has ever referred to since arrival.
169 *
170 * The above steps improve throughput when many threads produce
171 * and/or consume data. But they don't help much with
172 * single-source / single-sink usages in which one side or the
173 * other is always transiently blocked, and so throughput is
174 * mainly a function of thread scheduling. This is not usually
175 * noticeably improved with bounded short spin-waits. Instead both
176 * forms of transfer try Thread.yield if apparently the sole
177 * waiter. This works well when there are more tasks that cores,
178 * which is expected to be the main usage context of this mode. In
179 * other cases, waiters may help with some bookkeeping, then
180 * park/unpark.
181 */
182
183 /**
184 * Shared internal API for dual stacks and queues.
185 */
186 abstract static class Transferer<E> {
187 /**
188 * Performs a put or take.
189 *
190 * @param e if non-null, the item to be handed to a consumer;
191 * if null, requests that transfer return an item
192 * offered by producer.
193 * @param timed if this operation should timeout
194 * @param nanos the timeout, in nanoseconds
195 * @return if non-null, the item provided or received; if null,
196 * the operation failed due to timeout or interrupt --
197 * the caller can distinguish which of these occurred
198 * by checking Thread.interrupted.
199 */
200 abstract E transfer(E e, boolean timed, long nanos);
201 }
202
203 /**
204 * The number of nanoseconds for which it is faster to spin
205 * rather than to use timed park. A rough estimate suffices.
206 */
207 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
208
209 /** Dual stack */
210 static final class TransferStack<E> extends Transferer<E> {
211 /*
212 * This extends Scherer-Scott dual stack algorithm, differing,
213 * among other ways, by using "covering" nodes rather than
214 * bit-marked pointers: Fulfilling operations push on marker
215 * nodes (with FULFILLING bit set in mode) to reserve a spot
216 * to match a waiting node.
217 */
218
219 /* Modes for SNodes, ORed together in node fields */
220 /** Node represents an unfulfilled consumer */
221 static final int REQUEST = 0;
222 /** Node represents an unfulfilled producer */
223 static final int DATA = 1;
224 /** Node is fulfilling another unfulfilled DATA or REQUEST */
225 static final int FULFILLING = 2;
226
227 /** Returns true if m has fulfilling bit set. */
228 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
229
230 /** Node class for TransferStacks. */
231 static final class SNode implements ForkJoinPool.ManagedBlocker {
232 volatile SNode next; // next node in stack
233 volatile SNode match; // the node matched to this
234 volatile Thread waiter; // to control park/unpark
235 Object item; // data; or null for REQUESTs
236 int mode;
237 // Note: item and mode fields don't need to be volatile
238 // since they are always written before, and read after,
239 // other volatile/atomic operations.
240
241 SNode(Object item) {
242 this.item = item;
243 }
244
245 boolean casNext(SNode cmp, SNode val) {
246 return cmp == next &&
247 SNEXT.compareAndSet(this, cmp, val);
248 }
249
250 /**
251 * Tries to match node s to this node, if so, waking up thread.
252 * Fulfillers call tryMatch to identify their waiters.
253 * Waiters block until they have been matched.
254 *
255 * @param s the node to match
256 * @return true if successfully matched to s
257 */
258 boolean tryMatch(SNode s) {
259 SNode m; Thread w;
260 if ((m = match) == null) {
261 if (SMATCH.compareAndSet(this, null, s)) {
262 if ((w = waiter) != null)
263 LockSupport.unpark(w);
264 return true;
265 }
266 else
267 m = match;
268 }
269 return m == s;
270 }
271
272 /**
273 * Tries to cancel a wait by matching node to itself.
274 */
275 boolean tryCancel() {
276 return SMATCH.compareAndSet(this, null, this);
277 }
278
279 boolean isCancelled() {
280 return match == this;
281 }
282
283 public final boolean isReleasable() {
284 return match != null || Thread.currentThread().isInterrupted();
285 }
286
287 public final boolean block() {
288 while (!isReleasable()) LockSupport.park();
289 return true;
290 }
291
292 void forgetWaiter() {
293 SWAITER.setOpaque(this, null);
294 }
295
296 // VarHandle mechanics
297 private static final VarHandle SMATCH;
298 private static final VarHandle SNEXT;
299 private static final VarHandle SWAITER;
300 static {
301 try {
302 MethodHandles.Lookup l = MethodHandles.lookup();
303 SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
304 SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
305 SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
306 } catch (ReflectiveOperationException e) {
307 throw new ExceptionInInitializerError(e);
308 }
309 }
310 }
311
312 /** The head (top) of the stack */
313 volatile SNode head;
314
315 boolean casHead(SNode h, SNode nh) {
316 return h == head &&
317 SHEAD.compareAndSet(this, h, nh);
318 }
319
320 /**
321 * Creates or resets fields of a node. Called only from transfer
322 * where the node to push on stack is lazily created and
323 * reused when possible to help reduce intervals between reads
324 * and CASes of head and to avoid surges of garbage when CASes
325 * to push nodes fail due to contention.
352 *
353 * 3. If top of stack already holds another fulfilling node,
354 * help it out by doing its match and/or pop
355 * operations, and then continue. The code for helping
356 * is essentially the same as for fulfilling, except
357 * that it doesn't return the item.
358 */
359
360 SNode s = null; // constructed/reused as needed
361 int mode = (e == null) ? REQUEST : DATA;
362
363 for (;;) {
364 SNode h = head;
365 if (h == null || h.mode == mode) { // empty or same-mode
366 if (timed && nanos <= 0L) { // can't wait
367 if (h != null && h.isCancelled())
368 casHead(h, h.next); // pop cancelled node
369 else
370 return null;
371 } else if (casHead(h, s = snode(s, e, h, mode))) {
372 long deadline = timed ? System.nanoTime() + nanos : 0L;
373 Thread w = Thread.currentThread();
374 int stat = -1; // -1: may yield, +1: park, else 0
375 SNode m; // await fulfill or cancel
376 while ((m = s.match) == null) {
377 if ((timed &&
378 (nanos = deadline - System.nanoTime()) <= 0) ||
379 w.isInterrupted()) {
380 if (s.tryCancel()) {
381 clean(s); // wait cancelled
382 return null;
383 }
384 } else if ((m = s.match) != null) {
385 break; // recheck
386 } else if (stat <= 0) {
387 if (stat < 0 && h == null && head == s) {
388 stat = 0; // yield once if was empty
389 Thread.yield();
390 } else {
391 stat = 1;
392 s.waiter = w; // enable signal
393 }
394 } else if (!timed) {
395 LockSupport.setCurrentBlocker(this);
396 try {
397 ForkJoinPool.managedBlock(s);
398 } catch (InterruptedException cannotHappen) { }
399 LockSupport.setCurrentBlocker(null);
400 } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
401 LockSupport.parkNanos(this, nanos);
402 }
403 if (stat == 1)
404 s.forgetWaiter();
405 Object result = (mode == REQUEST) ? m.item : s.item;
406 if (h != null && h.next == s)
407 casHead(h, s.next); // help fulfiller
408 return (E) result;
409 }
410 } else if (!isFulfilling(h.mode)) { // try to fulfill
411 if (h.isCancelled()) // already cancelled
412 casHead(h, h.next); // pop and retry
413 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
414 for (;;) { // loop until matched or waiters disappear
415 SNode m = s.next; // m is s's match
416 if (m == null) { // all waiters are gone
417 casHead(s, null); // pop fulfill node
418 s = null; // use new node next time
419 break; // restart main loop
420 }
421 SNode mn = m.next;
422 if (m.tryMatch(s)) {
423 casHead(s, mn); // pop both s and m
424 return (E) ((mode == REQUEST) ? m.item : s.item);
425 } else // lost match
426 s.casNext(m, mn); // help unlink
427 }
428 }
429 } else { // help a fulfiller
430 SNode m = h.next; // m is h's match
431 if (m == null) // waiter is gone
432 casHead(h, null); // pop fulfilling node
433 else {
434 SNode mn = m.next;
435 if (m.tryMatch(h)) // help match
436 casHead(h, mn); // pop both h and m
437 else // lost match
438 h.casNext(m, mn); // help unlink
439 }
440 }
441 }
442 }
443
444 /**
445 * Unlinks s from the stack.
446 */
447 void clean(SNode s) {
448 s.item = null; // forget item
449 s.forgetWaiter();
450
451 /*
452 * At worst we may need to traverse entire stack to unlink
453 * s. If there are multiple concurrent calls to clean, we
454 * might not see s if another thread has already removed
455 * it. But we can stop when we see any node known to
456 * follow s. We use s.next unless it too is cancelled, in
457 * which case we try the node one past. We don't check any
458 * further because we don't want to doubly traverse just to
459 * find sentinel.
460 */
461
462 SNode past = s.next;
463 if (past != null && past.isCancelled())
464 past = past.next;
465
466 // Absorb cancelled nodes at head
467 SNode p;
468 while ((p = head) != null && p != past && p.isCancelled())
469 casHead(p, p.next);
485 MethodHandles.Lookup l = MethodHandles.lookup();
486 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
487 } catch (ReflectiveOperationException e) {
488 throw new ExceptionInInitializerError(e);
489 }
490 }
491 }
492
493 /** Dual Queue */
494 static final class TransferQueue<E> extends Transferer<E> {
495 /*
496 * This extends Scherer-Scott dual queue algorithm, differing,
497 * among other ways, by using modes within nodes rather than
498 * marked pointers. The algorithm is a little simpler than
499 * that for stacks because fulfillers do not need explicit
500 * nodes, and matching is done by CAS'ing QNode.item field
501 * from non-null to null (for put) or vice versa (for take).
502 */
503
504 /** Node class for TransferQueue. */
505 static final class QNode implements ForkJoinPool.ManagedBlocker {
506 volatile QNode next; // next node in queue
507 volatile Object item; // CAS'ed to or from null
508 volatile Thread waiter; // to control park/unpark
509 final boolean isData;
510
511 QNode(Object item, boolean isData) {
512 this.item = item;
513 this.isData = isData;
514 }
515
516 boolean casNext(QNode cmp, QNode val) {
517 return next == cmp &&
518 QNEXT.compareAndSet(this, cmp, val);
519 }
520
521 boolean casItem(Object cmp, Object val) {
522 return item == cmp &&
523 QITEM.compareAndSet(this, cmp, val);
524 }
525
526 /**
527 * Tries to cancel by CAS'ing ref to this as item.
528 */
529 boolean tryCancel(Object cmp) {
530 return QITEM.compareAndSet(this, cmp, this);
531 }
532
533 boolean isCancelled() {
534 return item == this;
535 }
536
537 /**
538 * Returns true if this node is known to be off the queue
539 * because its next pointer has been forgotten due to
540 * an advanceHead operation.
541 */
542 boolean isOffList() {
543 return next == this;
544 }
545
546 void forgetWaiter() {
547 QWAITER.setOpaque(this, null);
548 }
549
550 boolean isFulfilled() {
551 Object x;
552 return isData == ((x = item) == null) || x == this;
553 }
554
555 public final boolean isReleasable() {
556 Object x;
557 return isData == ((x = item) == null) || x == this ||
558 Thread.currentThread().isInterrupted();
559 }
560
561 public final boolean block() {
562 while (!isReleasable()) LockSupport.park();
563 return true;
564 }
565
566 // VarHandle mechanics
567 private static final VarHandle QITEM;
568 private static final VarHandle QNEXT;
569 private static final VarHandle QWAITER;
570 static {
571 try {
572 MethodHandles.Lookup l = MethodHandles.lookup();
573 QITEM = l.findVarHandle(QNode.class, "item", Object.class);
574 QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
575 QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
576 } catch (ReflectiveOperationException e) {
577 throw new ExceptionInInitializerError(e);
578 }
579 }
580 }
581
582 /** Head of queue */
583 transient volatile QNode head;
584 /** Tail of queue */
585 transient volatile QNode tail;
586 /**
587 * Reference to a cancelled node that might not yet have been
588 * unlinked from queue because it was the last inserted node
589 * when it was cancelled.
590 */
591 transient volatile QNode cleanMe;
592
593 TransferQueue() {
594 QNode h = new QNode(null, false); // initialize to dummy node.
595 head = h;
637 * 2. If queue apparently contains waiting items, and this
638 * call is of complementary mode, try to fulfill by CAS'ing
639 * item field of waiting node and dequeuing it, and then
640 * returning matching item.
641 *
642 * In each case, along the way, check for and try to help
643 * advance head and tail on behalf of other stalled/slow
644 * threads.
645 *
646 * The loop starts off with a null check guarding against
647 * seeing uninitialized head or tail values. This never
648 * happens in current SynchronousQueue, but could if
649 * callers held non-volatile/final ref to the
650 * transferer. The check is here anyway because it places
651 * null checks at top of loop, which is usually faster
652 * than having them implicitly interspersed.
653 */
654
655 QNode s = null; // constructed/reused as needed
656 boolean isData = (e != null);
657 for (;;) {
658 QNode t = tail, h = head, m, tn; // m is node to fulfill
659 if (t == null || h == null)
660 ; // inconsistent
661 else if (h == t || t.isData == isData) { // empty or same-mode
662 if (t != tail) // inconsistent
663 ;
664 else if ((tn = t.next) != null) // lagging tail
665 advanceTail(t, tn);
666 else if (timed && nanos <= 0L) // can't wait
667 return null;
668 else if (t.casNext(null, (s != null) ? s :
669 (s = new QNode(e, isData)))) {
670 advanceTail(t, s);
671 long deadline = timed ? System.nanoTime() + nanos : 0L;
672 Thread w = Thread.currentThread();
673 int stat = -1; // same idea as TransferStack
674 Object item;
675 while ((item = s.item) == e) {
676 if ((timed &&
677 (nanos = deadline - System.nanoTime()) <= 0) ||
678 w.isInterrupted()) {
679 if (s.tryCancel(e)) {
680 clean(t, s);
681 return null;
682 }
683 } else if ((item = s.item) != e) {
684 break; // recheck
685 } else if (stat <= 0) {
686 if (t.next == s) {
687 if (stat < 0 && t.isFulfilled()) {
688 stat = 0; // yield once if first
689 Thread.yield();
690 }
691 else {
692 stat = 1;
693 s.waiter = w;
694 }
695 }
696 } else if (!timed) {
697 LockSupport.setCurrentBlocker(this);
698 try {
699 ForkJoinPool.managedBlock(s);
700 } catch (InterruptedException cannotHappen) { }
701 LockSupport.setCurrentBlocker(null);
702 }
703 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
704 LockSupport.parkNanos(this, nanos);
705 }
706 if (stat == 1)
707 s.forgetWaiter();
708 if (!s.isOffList()) { // not already unlinked
709 advanceHead(t, s); // unlink if head
710 if (item != null) // and forget fields
711 s.item = s;
712 }
713 return (item != null) ? (E)item : e;
714 }
715
716 } else if ((m = h.next) != null && t == tail && h == head) {
717 Thread waiter;
718 Object x = m.item;
719 boolean fulfilled = ((isData == (x == null)) &&
720 x != m && m.casItem(x, e));
721 advanceHead(h, m); // (help) dequeue
722 if (fulfilled) {
723 if ((waiter = m.waiter) != null)
724 LockSupport.unpark(waiter);
725 return (x != null) ? (E)x : e;
726 }
727 }
728 }
729 }
730
731 /**
732 * Gets rid of cancelled node s with original predecessor pred.
733 */
734 void clean(QNode pred, QNode s) {
735 s.forgetWaiter();
736 /*
737 * At any given time, exactly one node on list cannot be
738 * deleted -- the last inserted node. To accommodate this,
739 * if we cannot delete s, we save its predecessor as
740 * "cleanMe", deleting the previously saved version
741 * first. At least one of node s or the node previously
742 * saved can always be deleted, so this always terminates.
743 */
744 while (pred.next == s) { // Return early if already unlinked
745 QNode h = head;
746 QNode hn = h.next; // Absorb cancelled first node as head
747 if (hn != null && hn.isCancelled()) {
748 advanceHead(h, hn);
749 continue;
750 }
751 QNode t = tail; // Ensure consistent read for tail
752 if (t == h)
753 return;
754 QNode tn = t.next;
755 if (t != tail)
|