< prev index next >

src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java

Print this page
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: martin


 149      * Cleaning is done in different ways in queues vs stacks.  For
 150      * queues, we can almost always remove a node immediately in O(1)
 151      * time (modulo retries for consistency checks) when it is
 152      * cancelled. But if it may be pinned as the current tail, it must
 153      * wait until some subsequent cancellation. For stacks, we need a
 154      * potentially O(n) traversal to be sure that we can remove the
 155      * node, but this can run concurrently with other threads
 156      * accessing the stack.
 157      *
 158      * While garbage collection takes care of most node reclamation
 159      * issues that otherwise complicate nonblocking algorithms, care
 160      * is taken to "forget" references to data, other nodes, and
 161      * threads that might be held on to long-term by blocked
 162      * threads. In cases where setting to null would otherwise
 163      * conflict with main algorithms, this is done by changing a
 164      * node's link to now point to the node itself. This doesn't arise
 165      * much for Stack nodes (because blocked threads do not hang on to
 166      * old head pointers), but references in Queue nodes must be
 167      * aggressively forgotten to avoid reachability of everything any
 168      * node has ever referred to since arrival.












 169      */
 170 
 171     /**
 172      * Shared internal API for dual stacks and queues.
 173      */
 174     abstract static class Transferer<E> {
 175         /**
 176          * Performs a put or take.
 177          *
 178          * @param e if non-null, the item to be handed to a consumer;
 179          *          if null, requests that transfer return an item
 180          *          offered by producer.
 181          * @param timed if this operation should timeout
 182          * @param nanos the timeout, in nanoseconds
 183          * @return if non-null, the item provided or received; if null,
 184          *         the operation failed due to timeout or interrupt --
 185          *         the caller can distinguish which of these occurred
 186          *         by checking Thread.interrupted.
 187          */
 188         abstract E transfer(E e, boolean timed, long nanos);
 189     }
 190 
 191     /**
 192      * The number of times to spin before blocking in timed waits.
 193      * The value is empirically derived -- it works well across a
 194      * variety of processors and OSes. Empirically, the best value
 195      * seems not to vary with number of CPUs (beyond 2) so is just
 196      * a constant.
 197      */
 198     static final int MAX_TIMED_SPINS =
 199         (Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
 200 
 201     /**
 202      * The number of times to spin before blocking in untimed waits.
 203      * This is greater than timed value because untimed waits spin
 204      * faster since they don't need to check times on each spin.
 205      */
 206     static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
 207 
 208     /**
 209      * The number of nanoseconds for which it is faster to spin
 210      * rather than to use timed park. A rough estimate suffices.
 211      */
 212     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
 213 
 214     /** Dual stack */
 215     static final class TransferStack<E> extends Transferer<E> {
 216         /*
 217          * This extends Scherer-Scott dual stack algorithm, differing,
 218          * among other ways, by using "covering" nodes rather than
 219          * bit-marked pointers: Fulfilling operations push on marker
 220          * nodes (with FULFILLING bit set in mode) to reserve a spot
 221          * to match a waiting node.
 222          */
 223 
 224         /* Modes for SNodes, ORed together in node fields */
 225         /** Node represents an unfulfilled consumer */
 226         static final int REQUEST    = 0;
 227         /** Node represents an unfulfilled producer */
 228         static final int DATA       = 1;
 229         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 230         static final int FULFILLING = 2;
 231 
 232         /** Returns true if m has fulfilling bit set. */
 233         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 234 
 235         /** Node class for TransferStacks. */
 236         static final class SNode {
 237             volatile SNode next;        // next node in stack
 238             volatile SNode match;       // the node matched to this
 239             volatile Thread waiter;     // to control park/unpark
 240             Object item;                // data; or null for REQUESTs
 241             int mode;
 242             // Note: item and mode fields don't need to be volatile
 243             // since they are always written before, and read after,
 244             // other volatile/atomic operations.
 245 
 246             SNode(Object item) {
 247                 this.item = item;
 248             }
 249 
 250             boolean casNext(SNode cmp, SNode val) {
 251                 return cmp == next &&
 252                     SNEXT.compareAndSet(this, cmp, val);
 253             }
 254 
 255             /**
 256              * Tries to match node s to this node, if so, waking up thread.
 257              * Fulfillers call tryMatch to identify their waiters.
 258              * Waiters block until they have been matched.
 259              *
 260              * @param s the node to match
 261              * @return true if successfully matched to s
 262              */
 263             boolean tryMatch(SNode s) {
 264                 if (match == null &&
 265                     SMATCH.compareAndSet(this, null, s)) {
 266                     Thread w = waiter;
 267                     if (w != null) {    // waiters need at most one unpark
 268                         waiter = null;
 269                         LockSupport.unpark(w);
 270                     }
 271                     return true;
 272                 }
 273                 return match == s;



 274             }
 275 
 276             /**
 277              * Tries to cancel a wait by matching node to itself.
 278              */
 279             void tryCancel() {
 280                 SMATCH.compareAndSet(this, null, this);
 281             }
 282 
 283             boolean isCancelled() {
 284                 return match == this;
 285             }
 286 













 287             // VarHandle mechanics
 288             private static final VarHandle SMATCH;
 289             private static final VarHandle SNEXT;

 290             static {
 291                 try {
 292                     MethodHandles.Lookup l = MethodHandles.lookup();
 293                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
 294                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);

 295                 } catch (ReflectiveOperationException e) {
 296                     throw new ExceptionInInitializerError(e);
 297                 }
 298             }
 299         }
 300 
 301         /** The head (top) of the stack */
 302         volatile SNode head;
 303 
 304         boolean casHead(SNode h, SNode nh) {
 305             return h == head &&
 306                 SHEAD.compareAndSet(this, h, nh);
 307         }
 308 
 309         /**
 310          * Creates or resets fields of a node. Called only from transfer
 311          * where the node to push on stack is lazily created and
 312          * reused when possible to help reduce intervals between reads
 313          * and CASes of head and to avoid surges of garbage when CASes
 314          * to push nodes fail due to contention.


 341              *
 342              * 3. If top of stack already holds another fulfilling node,
 343              *    help it out by doing its match and/or pop
 344              *    operations, and then continue. The code for helping
 345              *    is essentially the same as for fulfilling, except
 346              *    that it doesn't return the item.
 347              */
 348 
 349             SNode s = null; // constructed/reused as needed
 350             int mode = (e == null) ? REQUEST : DATA;
 351 
 352             for (;;) {
 353                 SNode h = head;
 354                 if (h == null || h.mode == mode) {  // empty or same-mode
 355                     if (timed && nanos <= 0L) {     // can't wait
 356                         if (h != null && h.isCancelled())
 357                             casHead(h, h.next);     // pop cancelled node
 358                         else
 359                             return null;
 360                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 361                         SNode m = awaitFulfill(s, timed, nanos);
 362                         if (m == s) {               // wait was cancelled
 363                             clean(s);







 364                             return null;
 365                         }
 366                         if ((h = head) != null && h.next == s)
 367                             casHead(h, s.next);     // help s's fulfiller
 368                         return (E) ((mode == REQUEST) ? m.item : s.item);






















 369                     }
 370                 } else if (!isFulfilling(h.mode)) { // try to fulfill
 371                     if (h.isCancelled())            // already cancelled
 372                         casHead(h, h.next);         // pop and retry
 373                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 374                         for (;;) { // loop until matched or waiters disappear
 375                             SNode m = s.next;       // m is s's match
 376                             if (m == null) {        // all waiters are gone
 377                                 casHead(s, null);   // pop fulfill node
 378                                 s = null;           // use new node next time
 379                                 break;              // restart main loop
 380                             }
 381                             SNode mn = m.next;
 382                             if (m.tryMatch(s)) {
 383                                 casHead(s, mn);     // pop both s and m
 384                                 return (E) ((mode == REQUEST) ? m.item : s.item);
 385                             } else                  // lost match
 386                                 s.casNext(m, mn);   // help unlink
 387                         }
 388                     }
 389                 } else {                            // help a fulfiller
 390                     SNode m = h.next;               // m is h's match
 391                     if (m == null)                  // waiter is gone
 392                         casHead(h, null);           // pop fulfilling node
 393                     else {
 394                         SNode mn = m.next;
 395                         if (m.tryMatch(h))          // help match
 396                             casHead(h, mn);         // pop both h and m
 397                         else                        // lost match
 398                             h.casNext(m, mn);       // help unlink
 399                     }
 400                 }
 401             }
 402         }
 403 
 404         /**
 405          * Spins/blocks until node s is matched by a fulfill operation.
 406          *
 407          * @param s the waiting node
 408          * @param timed true if timed wait
 409          * @param nanos timeout value
 410          * @return matched node, or s if cancelled
 411          */
 412         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
 413             /*
 414              * When a node/thread is about to block, it sets its waiter
 415              * field and then rechecks state at least one more time
 416              * before actually parking, thus covering race vs
 417              * fulfiller noticing that waiter is non-null so should be
 418              * woken.
 419              *
 420              * When invoked by nodes that appear at the point of call
 421              * to be at the head of the stack, calls to park are
 422              * preceded by spins to avoid blocking when producers and
 423              * consumers are arriving very close in time.  This can
 424              * happen enough to bother only on multiprocessors.
 425              *
 426              * The order of checks for returning out of main loop
 427              * reflects fact that interrupts have precedence over
 428              * normal returns, which have precedence over
 429              * timeouts. (So, on timeout, one last check for match is
 430              * done before giving up.) Except that calls from untimed
 431              * SynchronousQueue.{poll/offer} don't check interrupts
 432              * and don't wait at all, so are trapped in transfer
 433              * method rather than calling awaitFulfill.
 434              */
 435             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 436             Thread w = Thread.currentThread();
 437             int spins = shouldSpin(s)
 438                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 439                 : 0;
 440             for (;;) {
 441                 if (w.isInterrupted())
 442                     s.tryCancel();
 443                 SNode m = s.match;
 444                 if (m != null)
 445                     return m;
 446                 if (timed) {
 447                     nanos = deadline - System.nanoTime();
 448                     if (nanos <= 0L) {
 449                         s.tryCancel();
 450                         continue;
 451                     }
 452                 }
 453                 if (spins > 0) {
 454                     Thread.onSpinWait();
 455                     spins = shouldSpin(s) ? (spins - 1) : 0;
 456                 }
 457                 else if (s.waiter == null)
 458                     s.waiter = w; // establish waiter so can park next iter
 459                 else if (!timed)
 460                     LockSupport.park(this);
 461                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 462                     LockSupport.parkNanos(this, nanos);
 463             }
 464         }
 465 
 466         /**
 467          * Returns true if node s is at head or there is an active
 468          * fulfiller.
 469          */
 470         boolean shouldSpin(SNode s) {
 471             SNode h = head;
 472             return (h == s || h == null || isFulfilling(h.mode));
 473         }
 474 
 475         /**
 476          * Unlinks s from the stack.
 477          */
 478         void clean(SNode s) {
 479             s.item = null;   // forget item
 480             s.waiter = null; // forget thread
 481 
 482             /*
 483              * At worst we may need to traverse entire stack to unlink
 484              * s. If there are multiple concurrent calls to clean, we
 485              * might not see s if another thread has already removed
 486              * it. But we can stop when we see any node known to
 487              * follow s. We use s.next unless it too is cancelled, in
 488              * which case we try the node one past. We don't check any
 489              * further because we don't want to doubly traverse just to
 490              * find sentinel.
 491              */
 492 
 493             SNode past = s.next;
 494             if (past != null && past.isCancelled())
 495                 past = past.next;
 496 
 497             // Absorb cancelled nodes at head
 498             SNode p;
 499             while ((p = head) != null && p != past && p.isCancelled())
 500                 casHead(p, p.next);


 516                 MethodHandles.Lookup l = MethodHandles.lookup();
 517                 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
 518             } catch (ReflectiveOperationException e) {
 519                 throw new ExceptionInInitializerError(e);
 520             }
 521         }
 522     }
 523 
 524     /** Dual Queue */
 525     static final class TransferQueue<E> extends Transferer<E> {
 526         /*
 527          * This extends Scherer-Scott dual queue algorithm, differing,
 528          * among other ways, by using modes within nodes rather than
 529          * marked pointers. The algorithm is a little simpler than
 530          * that for stacks because fulfillers do not need explicit
 531          * nodes, and matching is done by CAS'ing QNode.item field
 532          * from non-null to null (for put) or vice versa (for take).
 533          */
 534 
 535         /** Node class for TransferQueue. */
 536         static final class QNode {
 537             volatile QNode next;          // next node in queue
 538             volatile Object item;         // CAS'ed to or from null
 539             volatile Thread waiter;       // to control park/unpark
 540             final boolean isData;
 541 
 542             QNode(Object item, boolean isData) {
 543                 this.item = item;
 544                 this.isData = isData;
 545             }
 546 
 547             boolean casNext(QNode cmp, QNode val) {
 548                 return next == cmp &&
 549                     QNEXT.compareAndSet(this, cmp, val);
 550             }
 551 
 552             boolean casItem(Object cmp, Object val) {
 553                 return item == cmp &&
 554                     QITEM.compareAndSet(this, cmp, val);
 555             }
 556 
 557             /**
 558              * Tries to cancel by CAS'ing ref to this as item.
 559              */
 560             void tryCancel(Object cmp) {
 561                 QITEM.compareAndSet(this, cmp, this);
 562             }
 563 
 564             boolean isCancelled() {
 565                 return item == this;
 566             }
 567 
 568             /**
 569              * Returns true if this node is known to be off the queue
 570              * because its next pointer has been forgotten due to
 571              * an advanceHead operation.
 572              */
 573             boolean isOffList() {
 574                 return next == this;
 575             }
 576 




















 577             // VarHandle mechanics
 578             private static final VarHandle QITEM;
 579             private static final VarHandle QNEXT;

 580             static {
 581                 try {
 582                     MethodHandles.Lookup l = MethodHandles.lookup();
 583                     QITEM = l.findVarHandle(QNode.class, "item", Object.class);
 584                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);

 585                 } catch (ReflectiveOperationException e) {
 586                     throw new ExceptionInInitializerError(e);
 587                 }
 588             }
 589         }
 590 
 591         /** Head of queue */
 592         transient volatile QNode head;
 593         /** Tail of queue */
 594         transient volatile QNode tail;
 595         /**
 596          * Reference to a cancelled node that might not yet have been
 597          * unlinked from queue because it was the last inserted node
 598          * when it was cancelled.
 599          */
 600         transient volatile QNode cleanMe;
 601 
 602         TransferQueue() {
 603             QNode h = new QNode(null, false); // initialize to dummy node.
 604             head = h;


 646              * 2. If queue apparently contains waiting items, and this
 647              *    call is of complementary mode, try to fulfill by CAS'ing
 648              *    item field of waiting node and dequeuing it, and then
 649              *    returning matching item.
 650              *
 651              * In each case, along the way, check for and try to help
 652              * advance head and tail on behalf of other stalled/slow
 653              * threads.
 654              *
 655              * The loop starts off with a null check guarding against
 656              * seeing uninitialized head or tail values. This never
 657              * happens in current SynchronousQueue, but could if
 658              * callers held non-volatile/final ref to the
 659              * transferer. The check is here anyway because it places
 660              * null checks at top of loop, which is usually faster
 661              * than having them implicitly interspersed.
 662              */
 663 
 664             QNode s = null; // constructed/reused as needed
 665             boolean isData = (e != null);
 666 
 667             for (;;) {
 668                 QNode t = tail;
 669                 QNode h = head;
 670                 if (t == null || h == null)         // saw uninitialized value
 671                     continue;                       // spin
 672 
 673                 if (h == t || t.isData == isData) { // empty or same-mode
 674                     QNode tn = t.next;
 675                     if (t != tail)                  // inconsistent read
 676                         continue;
 677                     if (tn != null) {               // lagging tail
 678                         advanceTail(t, tn);
 679                         continue;
 680                     }
 681                     if (timed && nanos <= 0L)       // can't wait
 682                         return null;
 683                     if (s == null)
 684                         s = new QNode(e, isData);
 685                     if (!t.casNext(null, s))        // failed to link in
 686                         continue;
 687 
 688                     advanceTail(t, s);              // swing tail and wait
 689                     Object x = awaitFulfill(s, e, timed, nanos);
 690                     if (x == s) {                   // wait was cancelled




 691                         clean(t, s);
 692                         return null;
 693                     }
 694 
























 695                     if (!s.isOffList()) {           // not already unlinked
 696                         advanceHead(t, s);          // unlink if head
 697                         if (x != null)              // and forget fields
 698                             s.item = s;
 699                         s.waiter = null;
 700                     }
 701                     return (x != null) ? (E)x : e;
 702 
 703                 } else {                            // complementary-mode
 704                     QNode m = h.next;               // node to fulfill
 705                     if (t != tail || m == null || h != head)
 706                         continue;                   // inconsistent read
 707 
 708                     Object x = m.item;
 709                     if (isData == (x != null) ||    // m already fulfilled
 710                         x == m ||                   // m cancelled
 711                         !m.casItem(x, e)) {         // lost CAS
 712                         advanceHead(h, m);          // dequeue and retry
 713                         continue;
 714                     }
 715 
 716                     advanceHead(h, m);              // successfully fulfilled
 717                     LockSupport.unpark(m.waiter);







 718                     return (x != null) ? (E)x : e;
 719                 }
 720             }
 721         }
 722 
 723         /**
 724          * Spins/blocks until node s is fulfilled.
 725          *
 726          * @param s the waiting node
 727          * @param e the comparison value for checking match
 728          * @param timed true if timed wait
 729          * @param nanos timeout value
 730          * @return matched item, or s if cancelled
 731          */
 732         Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
 733             /* Same idea as TransferStack.awaitFulfill */
 734             final long deadline = timed ? System.nanoTime() + nanos : 0L;
 735             Thread w = Thread.currentThread();
 736             int spins = (head.next == s)
 737                 ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
 738                 : 0;
 739             for (;;) {
 740                 if (w.isInterrupted())
 741                     s.tryCancel(e);
 742                 Object x = s.item;
 743                 if (x != e)
 744                     return x;
 745                 if (timed) {
 746                     nanos = deadline - System.nanoTime();
 747                     if (nanos <= 0L) {
 748                         s.tryCancel(e);
 749                         continue;
 750                     }
 751                 }
 752                 if (spins > 0) {
 753                     --spins;
 754                     Thread.onSpinWait();
 755                 }
 756                 else if (s.waiter == null)
 757                     s.waiter = w;
 758                 else if (!timed)
 759                     LockSupport.park(this);
 760                 else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 761                     LockSupport.parkNanos(this, nanos);
 762             }
 763         }
 764 
 765         /**
 766          * Gets rid of cancelled node s with original predecessor pred.
 767          */
 768         void clean(QNode pred, QNode s) {
 769             s.waiter = null; // forget thread
 770             /*
 771              * At any given time, exactly one node on list cannot be
 772              * deleted -- the last inserted node. To accommodate this,
 773              * if we cannot delete s, we save its predecessor as
 774              * "cleanMe", deleting the previously saved version
 775              * first. At least one of node s or the node previously
 776              * saved can always be deleted, so this always terminates.
 777              */
 778             while (pred.next == s) { // Return early if already unlinked
 779                 QNode h = head;
 780                 QNode hn = h.next;   // Absorb cancelled first node as head
 781                 if (hn != null && hn.isCancelled()) {
 782                     advanceHead(h, hn);
 783                     continue;
 784                 }
 785                 QNode t = tail;      // Ensure consistent read for tail
 786                 if (t == h)
 787                     return;
 788                 QNode tn = t.next;
 789                 if (t != tail)




 149      * Cleaning is done in different ways in queues vs stacks.  For
 150      * queues, we can almost always remove a node immediately in O(1)
 151      * time (modulo retries for consistency checks) when it is
 152      * cancelled. But if it may be pinned as the current tail, it must
 153      * wait until some subsequent cancellation. For stacks, we need a
 154      * potentially O(n) traversal to be sure that we can remove the
 155      * node, but this can run concurrently with other threads
 156      * accessing the stack.
 157      *
 158      * While garbage collection takes care of most node reclamation
 159      * issues that otherwise complicate nonblocking algorithms, care
 160      * is taken to "forget" references to data, other nodes, and
 161      * threads that might be held on to long-term by blocked
 162      * threads. In cases where setting to null would otherwise
 163      * conflict with main algorithms, this is done by changing a
 164      * node's link to now point to the node itself. This doesn't arise
 165      * much for Stack nodes (because blocked threads do not hang on to
 166      * old head pointers), but references in Queue nodes must be
 167      * aggressively forgotten to avoid reachability of everything any
 168      * node has ever referred to since arrival.
 169      *
 170      * The above steps improve throughput when many threads produce
 171      * and/or consume data. But they don't help much with
 172      * single-source / single-sink usages in which one side or the
 173      * other is always transiently blocked, and so throughput is
 174      * mainly a function of thread scheduling. This is not usually
 175      * noticeably improved with bounded short spin-waits. Instead both
 176      * forms of transfer try Thread.yield if apparently the sole
 177      * waiter. This works well when there are more tasks that cores,
 178      * which is expected to be the main usage context of this mode. In
 179      * other cases, waiters may help with some bookkeeping, then
 180      * park/unpark.
 181      */
 182 
 183     /**
 184      * Shared internal API for dual stacks and queues.
 185      */
 186     abstract static class Transferer<E> {
 187         /**
 188          * Performs a put or take.
 189          *
 190          * @param e if non-null, the item to be handed to a consumer;
 191          *          if null, requests that transfer return an item
 192          *          offered by producer.
 193          * @param timed if this operation should timeout
 194          * @param nanos the timeout, in nanoseconds
 195          * @return if non-null, the item provided or received; if null,
 196          *         the operation failed due to timeout or interrupt --
 197          *         the caller can distinguish which of these occurred
 198          *         by checking Thread.interrupted.
 199          */
 200         abstract E transfer(E e, boolean timed, long nanos);
 201     }
 202 
 203     /**

















 204      * The number of nanoseconds for which it is faster to spin
 205      * rather than to use timed park. A rough estimate suffices.
 206      */
 207     static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
 208 
 209     /** Dual stack */
 210     static final class TransferStack<E> extends Transferer<E> {
 211         /*
 212          * This extends Scherer-Scott dual stack algorithm, differing,
 213          * among other ways, by using "covering" nodes rather than
 214          * bit-marked pointers: Fulfilling operations push on marker
 215          * nodes (with FULFILLING bit set in mode) to reserve a spot
 216          * to match a waiting node.
 217          */
 218 
 219         /* Modes for SNodes, ORed together in node fields */
 220         /** Node represents an unfulfilled consumer */
 221         static final int REQUEST    = 0;
 222         /** Node represents an unfulfilled producer */
 223         static final int DATA       = 1;
 224         /** Node is fulfilling another unfulfilled DATA or REQUEST */
 225         static final int FULFILLING = 2;
 226 
 227         /** Returns true if m has fulfilling bit set. */
 228         static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
 229 
 230         /** Node class for TransferStacks. */
 231         static final class SNode implements ForkJoinPool.ManagedBlocker {
 232             volatile SNode next;        // next node in stack
 233             volatile SNode match;       // the node matched to this
 234             volatile Thread waiter;     // to control park/unpark
 235             Object item;                // data; or null for REQUESTs
 236             int mode;
 237             // Note: item and mode fields don't need to be volatile
 238             // since they are always written before, and read after,
 239             // other volatile/atomic operations.
 240 
 241             SNode(Object item) {
 242                 this.item = item;
 243             }
 244 
 245             boolean casNext(SNode cmp, SNode val) {
 246                 return cmp == next &&
 247                     SNEXT.compareAndSet(this, cmp, val);
 248             }
 249 
 250             /**
 251              * Tries to match node s to this node, if so, waking up thread.
 252              * Fulfillers call tryMatch to identify their waiters.
 253              * Waiters block until they have been matched.
 254              *
 255              * @param s the node to match
 256              * @return true if successfully matched to s
 257              */
 258             boolean tryMatch(SNode s) {
 259                 SNode m; Thread w;
 260                 if ((m = match) == null) {
 261                     if (SMATCH.compareAndSet(this, null, s)) {
 262                         if ((w = waiter) != null)

 263                             LockSupport.unpark(w);

 264                         return true;
 265                     }
 266                     else
 267                         m = match;
 268                 }
 269                 return m == s;
 270             }
 271 
 272             /**
 273              * Tries to cancel a wait by matching node to itself.
 274              */
 275             boolean tryCancel() {
 276                 return SMATCH.compareAndSet(this, null, this);
 277             }
 278 
 279             boolean isCancelled() {
 280                 return match == this;
 281             }
 282 
 283             public final boolean isReleasable() {
 284                 return match != null || Thread.currentThread().isInterrupted();
 285             }
 286 
 287             public final boolean block() {
 288                 while (!isReleasable()) LockSupport.park();
 289                 return true;
 290             }
 291 
 292             void forgetWaiter() {
 293                 SWAITER.setOpaque(this, null);
 294             }
 295 
 296             // VarHandle mechanics
 297             private static final VarHandle SMATCH;
 298             private static final VarHandle SNEXT;
 299             private static final VarHandle SWAITER;
 300             static {
 301                 try {
 302                     MethodHandles.Lookup l = MethodHandles.lookup();
 303                     SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
 304                     SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
 305                     SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
 306                 } catch (ReflectiveOperationException e) {
 307                     throw new ExceptionInInitializerError(e);
 308                 }
 309             }
 310         }
 311 
 312         /** The head (top) of the stack */
 313         volatile SNode head;
 314 
 315         boolean casHead(SNode h, SNode nh) {
 316             return h == head &&
 317                 SHEAD.compareAndSet(this, h, nh);
 318         }
 319 
 320         /**
 321          * Creates or resets fields of a node. Called only from transfer
 322          * where the node to push on stack is lazily created and
 323          * reused when possible to help reduce intervals between reads
 324          * and CASes of head and to avoid surges of garbage when CASes
 325          * to push nodes fail due to contention.


 352              *
 353              * 3. If top of stack already holds another fulfilling node,
 354              *    help it out by doing its match and/or pop
 355              *    operations, and then continue. The code for helping
 356              *    is essentially the same as for fulfilling, except
 357              *    that it doesn't return the item.
 358              */
 359 
 360             SNode s = null; // constructed/reused as needed
 361             int mode = (e == null) ? REQUEST : DATA;
 362 
 363             for (;;) {
 364                 SNode h = head;
 365                 if (h == null || h.mode == mode) {  // empty or same-mode
 366                     if (timed && nanos <= 0L) {     // can't wait
 367                         if (h != null && h.isCancelled())
 368                             casHead(h, h.next);     // pop cancelled node
 369                         else
 370                             return null;
 371                     } else if (casHead(h, s = snode(s, e, h, mode))) {
 372                         long deadline = timed ? System.nanoTime() + nanos : 0L;
 373                         Thread w = Thread.currentThread();
 374                         int stat = -1; // -1: may yield, +1: park, else 0
 375                         SNode m;                    // await fulfill or cancel
 376                         while ((m = s.match) == null) {
 377                             if ((timed &&
 378                                  (nanos = deadline - System.nanoTime()) <= 0) ||
 379                                 w.isInterrupted()) {
 380                                 if (s.tryCancel()) {
 381                                     clean(s);       // wait cancelled
 382                                     return null;
 383                                 }
 384                             } else if ((m = s.match) != null) {
 385                                 break;              // recheck
 386                             } else if (stat <= 0) {
 387                                 if (stat < 0 && h == null && head == s) {
 388                                     stat = 0;       // yield once if was empty
 389                                     Thread.yield();
 390                                 } else {
 391                                     stat = 1;
 392                                     s.waiter = w;   // enable signal
 393                                 }
 394                             } else if (!timed) {
 395                                 LockSupport.setCurrentBlocker(this);
 396                                 try {
 397                                     ForkJoinPool.managedBlock(s);
 398                                 } catch (InterruptedException cannotHappen) { }
 399                                 LockSupport.setCurrentBlocker(null);
 400                             } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 401                                 LockSupport.parkNanos(this, nanos);
 402                         }
 403                         if (stat == 1)
 404                             s.forgetWaiter();
 405                         Object result = (mode == REQUEST) ? m.item : s.item;
 406                         if (h != null && h.next == s)
 407                             casHead(h, s.next);     // help fulfiller
 408                         return (E) result;
 409                     }
 410                 } else if (!isFulfilling(h.mode)) { // try to fulfill
 411                     if (h.isCancelled())            // already cancelled
 412                         casHead(h, h.next);         // pop and retry
 413                     else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
 414                         for (;;) { // loop until matched or waiters disappear
 415                             SNode m = s.next;       // m is s's match
 416                             if (m == null) {        // all waiters are gone
 417                                 casHead(s, null);   // pop fulfill node
 418                                 s = null;           // use new node next time
 419                                 break;              // restart main loop
 420                             }
 421                             SNode mn = m.next;
 422                             if (m.tryMatch(s)) {
 423                                 casHead(s, mn);     // pop both s and m
 424                                 return (E) ((mode == REQUEST) ? m.item : s.item);
 425                             } else                  // lost match
 426                                 s.casNext(m, mn);   // help unlink
 427                         }
 428                     }
 429                 } else {                            // help a fulfiller
 430                     SNode m = h.next;               // m is h's match
 431                     if (m == null)                  // waiter is gone
 432                         casHead(h, null);           // pop fulfilling node
 433                     else {
 434                         SNode mn = m.next;
 435                         if (m.tryMatch(h))          // help match
 436                             casHead(h, mn);         // pop both h and m
 437                         else                        // lost match
 438                             h.casNext(m, mn);       // help unlink
 439                     }
 440                 }
 441             }
 442         }
 443 
 444         /**







































































 445          * Unlinks s from the stack.
 446          */
 447         void clean(SNode s) {
 448             s.item = null;   // forget item
 449             s.forgetWaiter();
 450 
 451             /*
 452              * At worst we may need to traverse entire stack to unlink
 453              * s. If there are multiple concurrent calls to clean, we
 454              * might not see s if another thread has already removed
 455              * it. But we can stop when we see any node known to
 456              * follow s. We use s.next unless it too is cancelled, in
 457              * which case we try the node one past. We don't check any
 458              * further because we don't want to doubly traverse just to
 459              * find sentinel.
 460              */
 461 
 462             SNode past = s.next;
 463             if (past != null && past.isCancelled())
 464                 past = past.next;
 465 
 466             // Absorb cancelled nodes at head
 467             SNode p;
 468             while ((p = head) != null && p != past && p.isCancelled())
 469                 casHead(p, p.next);


 485                 MethodHandles.Lookup l = MethodHandles.lookup();
 486                 SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class);
 487             } catch (ReflectiveOperationException e) {
 488                 throw new ExceptionInInitializerError(e);
 489             }
 490         }
 491     }
 492 
 493     /** Dual Queue */
 494     static final class TransferQueue<E> extends Transferer<E> {
 495         /*
 496          * This extends Scherer-Scott dual queue algorithm, differing,
 497          * among other ways, by using modes within nodes rather than
 498          * marked pointers. The algorithm is a little simpler than
 499          * that for stacks because fulfillers do not need explicit
 500          * nodes, and matching is done by CAS'ing QNode.item field
 501          * from non-null to null (for put) or vice versa (for take).
 502          */
 503 
 504         /** Node class for TransferQueue. */
 505         static final class QNode implements ForkJoinPool.ManagedBlocker {
 506             volatile QNode next;          // next node in queue
 507             volatile Object item;         // CAS'ed to or from null
 508             volatile Thread waiter;       // to control park/unpark
 509             final boolean isData;
 510 
 511             QNode(Object item, boolean isData) {
 512                 this.item = item;
 513                 this.isData = isData;
 514             }
 515 
 516             boolean casNext(QNode cmp, QNode val) {
 517                 return next == cmp &&
 518                     QNEXT.compareAndSet(this, cmp, val);
 519             }
 520 
 521             boolean casItem(Object cmp, Object val) {
 522                 return item == cmp &&
 523                     QITEM.compareAndSet(this, cmp, val);
 524             }
 525 
 526             /**
 527              * Tries to cancel by CAS'ing ref to this as item.
 528              */
 529             boolean tryCancel(Object cmp) {
 530                 return QITEM.compareAndSet(this, cmp, this);
 531             }
 532 
 533             boolean isCancelled() {
 534                 return item == this;
 535             }
 536 
 537             /**
 538              * Returns true if this node is known to be off the queue
 539              * because its next pointer has been forgotten due to
 540              * an advanceHead operation.
 541              */
 542             boolean isOffList() {
 543                 return next == this;
 544             }
 545 
 546             void forgetWaiter() {
 547                 QWAITER.setOpaque(this, null);
 548             }
 549 
 550             boolean isFulfilled() {
 551                 Object x;
 552                 return isData == ((x = item) == null) || x == this;
 553             }
 554 
 555             public final boolean isReleasable() {
 556                 Object x;
 557                 return isData == ((x = item) == null) || x == this ||
 558                     Thread.currentThread().isInterrupted();
 559             }
 560 
 561             public final boolean block() {
 562                 while (!isReleasable()) LockSupport.park();
 563                 return true;
 564             }
 565 
 566             // VarHandle mechanics
 567             private static final VarHandle QITEM;
 568             private static final VarHandle QNEXT;
 569             private static final VarHandle QWAITER;
 570             static {
 571                 try {
 572                     MethodHandles.Lookup l = MethodHandles.lookup();
 573                     QITEM = l.findVarHandle(QNode.class, "item", Object.class);
 574                     QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
 575                     QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
 576                 } catch (ReflectiveOperationException e) {
 577                     throw new ExceptionInInitializerError(e);
 578                 }
 579             }
 580         }
 581 
 582         /** Head of queue */
 583         transient volatile QNode head;
 584         /** Tail of queue */
 585         transient volatile QNode tail;
 586         /**
 587          * Reference to a cancelled node that might not yet have been
 588          * unlinked from queue because it was the last inserted node
 589          * when it was cancelled.
 590          */
 591         transient volatile QNode cleanMe;
 592 
 593         TransferQueue() {
 594             QNode h = new QNode(null, false); // initialize to dummy node.
 595             head = h;


 637              * 2. If queue apparently contains waiting items, and this
 638              *    call is of complementary mode, try to fulfill by CAS'ing
 639              *    item field of waiting node and dequeuing it, and then
 640              *    returning matching item.
 641              *
 642              * In each case, along the way, check for and try to help
 643              * advance head and tail on behalf of other stalled/slow
 644              * threads.
 645              *
 646              * The loop starts off with a null check guarding against
 647              * seeing uninitialized head or tail values. This never
 648              * happens in current SynchronousQueue, but could if
 649              * callers held non-volatile/final ref to the
 650              * transferer. The check is here anyway because it places
 651              * null checks at top of loop, which is usually faster
 652              * than having them implicitly interspersed.
 653              */
 654 
 655             QNode s = null;                  // constructed/reused as needed
 656             boolean isData = (e != null);

 657             for (;;) {
 658                 QNode t = tail, h = head, m, tn;         // m is node to fulfill
 659                 if (t == null || h == null)
 660                     ;                                    // inconsistent
 661                 else if (h == t || t.isData == isData) { // empty or same-mode
 662                     if (t != tail)                       // inconsistent
 663                         ;
 664                     else if ((tn = t.next) != null)      // lagging tail



 665                         advanceTail(t, tn);
 666                     else if (timed && nanos <= 0L)       // can't wait


 667                         return null;
 668                     else if (t.casNext(null, (s != null) ? s :
 669                                        (s = new QNode(e, isData)))) {
 670                         advanceTail(t, s);
 671                         long deadline = timed ? System.nanoTime() + nanos : 0L;
 672                         Thread w = Thread.currentThread();
 673                         int stat = -1; // same idea as TransferStack
 674                         Object item;
 675                         while ((item = s.item) == e) {
 676                             if ((timed &&
 677                                  (nanos = deadline - System.nanoTime()) <= 0) ||
 678                                 w.isInterrupted()) {
 679                                 if (s.tryCancel(e)) {
 680                                     clean(t, s);
 681                                     return null;
 682                                 }
 683                             } else if ((item = s.item) != e) {
 684                                 break;                   // recheck
 685                             } else if (stat <= 0) {
 686                                 if (t.next == s) {
 687                                     if (stat < 0 && t.isFulfilled()) {
 688                                         stat = 0;        // yield once if first
 689                                         Thread.yield();
 690                                     }
 691                                     else {
 692                                         stat = 1;
 693                                         s.waiter = w;
 694                                     }
 695                                 }
 696                             } else if (!timed) {
 697                                 LockSupport.setCurrentBlocker(this);
 698                                 try {
 699                                     ForkJoinPool.managedBlock(s);
 700                                 } catch (InterruptedException cannotHappen) { }
 701                                 LockSupport.setCurrentBlocker(null);
 702                             }
 703                             else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
 704                                 LockSupport.parkNanos(this, nanos);
 705                         }
 706                         if (stat == 1)
 707                             s.forgetWaiter();
 708                         if (!s.isOffList()) {            // not already unlinked
 709                             advanceHead(t, s);           // unlink if head
 710                             if (item != null)            // and forget fields
 711                                 s.item = s;

 712                         }
 713                         return (item != null) ? (E)item : e;












 714                     }
 715 
 716                 } else if ((m = h.next) != null && t == tail && h == head) {
 717                     Thread waiter;
 718                     Object x = m.item;
 719                     boolean fulfilled = ((isData == (x == null)) &&
 720                                          x != m && m.casItem(x, e));
 721                     advanceHead(h, m);                    // (help) dequeue
 722                     if (fulfilled) {
 723                         if ((waiter = m.waiter) != null)
 724                             LockSupport.unpark(waiter);
 725                         return (x != null) ? (E)x : e;
 726                     }
 727                 }
 728             }









































 729         }
 730 
 731         /**
 732          * Gets rid of cancelled node s with original predecessor pred.
 733          */
 734         void clean(QNode pred, QNode s) {
 735             s.forgetWaiter();
 736             /*
 737              * At any given time, exactly one node on list cannot be
 738              * deleted -- the last inserted node. To accommodate this,
 739              * if we cannot delete s, we save its predecessor as
 740              * "cleanMe", deleting the previously saved version
 741              * first. At least one of node s or the node previously
 742              * saved can always be deleted, so this always terminates.
 743              */
 744             while (pred.next == s) { // Return early if already unlinked
 745                 QNode h = head;
 746                 QNode hn = h.next;   // Absorb cancelled first node as head
 747                 if (hn != null && hn.isCancelled()) {
 748                     advanceHead(h, hn);
 749                     continue;
 750                 }
 751                 QNode t = tail;      // Ensure consistent read for tail
 752                 if (t == h)
 753                     return;
 754                 QNode tn = t.next;
 755                 if (t != tail)


< prev index next >