1 /*
   2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   3  *
   4  * This code is free software; you can redistribute it and/or modify it
   5  * under the terms of the GNU General Public License version 2 only, as
   6  * published by the Free Software Foundation.
   7  *
   8  * This code is distributed in the hope that it will be useful, but WITHOUT
   9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  11  * version 2 for more details (a copy is included in the LICENSE file that
  12  * accompanied this code).
  13  *
  14  * You should have received a copy of the GNU General Public License version
  15  * 2 along with this work; if not, write to the Free Software Foundation,
  16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  17  *
  18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  19  * or visit www.oracle.com if you need additional information or have any
  20  * questions.
  21  */
  22 
  23 /*
  24  * This file is available under and governed by the GNU General Public
  25  * License version 2 only, as published by the Free Software Foundation.
  26  * However, the following notice accompanied the original version of this
  27  * file:
  28  *
  29  * Written by Doug Lea with assistance from members of JCP JSR-166
  30  * Expert Group and released to the public domain, as explained at
  31  * http://creativecommons.org/publicdomain/zero/1.0/
  32  */
  33 
  34 /*
  35  * @test
  36  * @bug 4486658 6785442
  37  * @summary Checks that a set of threads can repeatedly get and modify items
  38  * @library /test/lib
  39  * @run main ConcurrentQueueLoops 8 123456
  40  */
  41 
  42 import static java.util.concurrent.TimeUnit.MILLISECONDS;
  43 
  44 import java.util.ArrayList;
  45 import java.util.Collection;
  46 import java.util.Collections;
  47 import java.util.List;
  48 import java.util.Queue;
  49 import java.util.concurrent.ArrayBlockingQueue;
  50 import java.util.concurrent.ConcurrentLinkedDeque;
  51 import java.util.concurrent.ConcurrentLinkedQueue;
  52 import java.util.concurrent.Callable;
  53 import java.util.concurrent.CyclicBarrier;
  54 import java.util.concurrent.ExecutorService;
  55 import java.util.concurrent.Executors;
  56 import java.util.concurrent.Future;
  57 import java.util.concurrent.LinkedBlockingDeque;
  58 import java.util.concurrent.LinkedBlockingQueue;
  59 import java.util.concurrent.LinkedTransferQueue;
  60 import java.util.concurrent.atomic.AtomicInteger;
  61 import jdk.test.lib.Utils;
  62 
  63 public class ConcurrentQueueLoops {
  64     static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
  65     ExecutorService pool;
  66     AtomicInteger totalItems;
  67     boolean print;
  68 
  69     // Suitable for benchmarking.  Overridden by args[0] for testing.
  70     int maxStages = 20;
  71 
  72     // Suitable for benchmarking.  Overridden by args[1] for testing.
  73     int items = 1024 * 1024;
  74 
  75     Collection<Queue<Integer>> concurrentQueues() {
  76         List<Queue<Integer>> queues = new ArrayList<>();
  77         queues.add(new ConcurrentLinkedDeque<Integer>());
  78         queues.add(new ConcurrentLinkedQueue<Integer>());
  79         queues.add(new ArrayBlockingQueue<Integer>(items, false));
  80         //queues.add(new ArrayBlockingQueue<Integer>(count, true));
  81         queues.add(new LinkedBlockingQueue<Integer>());
  82         queues.add(new LinkedBlockingDeque<Integer>());
  83         queues.add(new LinkedTransferQueue<Integer>());
  84 
  85         // Following additional implementations are available from:
  86         // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html
  87         // queues.add(new SynchronizedLinkedListQueue<Integer>());
  88 
  89         // Avoid "first fast, second slow" benchmark effect.
  90         Collections.shuffle(queues);
  91         return queues;
  92     }
  93 
  94     void test(String[] args) throws Throwable {
  95         if (args.length > 0)
  96             maxStages = Integer.parseInt(args[0]);
  97         if (args.length > 1)
  98             items = Integer.parseInt(args[1]);
  99 
 100         for (Queue<Integer> queue : concurrentQueues())
 101             test(queue);
 102     }
 103 
 104     void test(final Queue<Integer> q) throws Throwable {
 105         System.out.println(q.getClass().getSimpleName());
 106         pool = Executors.newCachedThreadPool();
 107         print = false;
 108 
 109         print = false;
 110         System.out.println("Warmup...");
 111         oneRun(1, items, q);
 112         oneRun(3, items, q);
 113         print = true;
 114 
 115         for (int i = 1; i <= maxStages; i += (i+1) >>> 1) {
 116             oneRun(i, items, q);
 117         }
 118         pool.shutdown();
 119         check(pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
 120    }
 121 
 122     class Stage implements Callable<Integer> {
 123         final Queue<Integer> queue;
 124         final CyclicBarrier barrier;
 125         int items;
 126         Stage(Queue<Integer> q, CyclicBarrier b, int items) {
 127             queue = q;
 128             barrier = b;
 129             this.items = items;
 130         }
 131 
 132         public Integer call() {
 133             // Repeatedly take something from queue if possible,
 134             // transform it, and put back in.
 135             try {
 136                 barrier.await();
 137                 int l = 4321;
 138                 int takes = 0;
 139                 for (;;) {
 140                     Integer item = queue.poll();
 141                     if (item != null) {
 142                         ++takes;
 143                         l = LoopHelpers.compute2(item.intValue());
 144                     }
 145                     else if (takes != 0) {
 146                         totalItems.getAndAdd(-takes);
 147                         takes = 0;
 148                     }
 149                     else if (totalItems.get() <= 0)
 150                         break;
 151                     l = LoopHelpers.compute1(l);
 152                     if (items > 0) {
 153                         --items;
 154                         queue.offer(new Integer(l));
 155                     }
 156                     else if ( (l & (3 << 5)) == 0) // spinwait
 157                         Thread.sleep(1);
 158                 }
 159                 return new Integer(l);
 160             }
 161             catch (Throwable t) { unexpected(t); return null; }
 162         }
 163     }
 164 
 165     void oneRun(int n, int items, final Queue<Integer> q) throws Exception {
 166         LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
 167         CyclicBarrier barrier = new CyclicBarrier(n + 1, timer);
 168         totalItems = new AtomicInteger(n * items);
 169         ArrayList<Future<Integer>> results = new ArrayList<>(n);
 170         for (int i = 0; i < n; ++i)
 171             results.add(pool.submit(new Stage(q, barrier, items)));
 172 
 173         if (print)
 174             System.out.print("Threads: " + n + "\t:");
 175         barrier.await();
 176         int total = 0;
 177         for (int i = 0; i < n; ++i) {
 178             Future<Integer> f = results.get(i);
 179             Integer r = f.get();
 180             total += r.intValue();
 181         }
 182         long endTime = System.nanoTime();
 183         long time = endTime - timer.startTime;
 184         if (print)
 185             System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item");
 186         if (total == 0) // avoid overoptimization
 187             System.out.println("useless result: " + total);
 188     }
 189 
 190     //--------------------- Infrastructure ---------------------------
 191     volatile int passed = 0, failed = 0;
 192     void pass() {passed++;}
 193     void fail() {failed++; Thread.dumpStack();}
 194     void fail(String msg) {System.err.println(msg); fail();}
 195     void unexpected(Throwable t) {failed++; t.printStackTrace();}
 196     void check(boolean cond) {if (cond) pass(); else fail();}
 197     void equal(Object x, Object y) {
 198         if (x == null ? y == null : x.equals(y)) pass();
 199         else fail(x + " not equal to " + y);}
 200     public static void main(String[] args) throws Throwable {
 201         new ConcurrentQueueLoops().instanceMain(args);}
 202     public void instanceMain(String[] args) throws Throwable {
 203         try {test(args);} catch (Throwable t) {unexpected(t);}
 204         System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
 205         if (failed > 0) throw new AssertionError("Some tests failed");}
 206 }