1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 /* 35 * @test 36 * @bug 4486658 6785442 37 * @summary Checks that a set of threads can repeatedly get and modify items 38 * @library /test/lib 39 * @run main ConcurrentQueueLoops 8 123456 40 */ 41 42 import static java.util.concurrent.TimeUnit.MILLISECONDS; 43 44 import java.util.ArrayList; 45 import java.util.Collection; 46 import java.util.Collections; 47 import java.util.List; 48 import java.util.Queue; 49 import java.util.concurrent.ArrayBlockingQueue; 50 import java.util.concurrent.ConcurrentLinkedDeque; 51 import java.util.concurrent.ConcurrentLinkedQueue; 52 import java.util.concurrent.Callable; 53 import java.util.concurrent.CyclicBarrier; 54 import java.util.concurrent.ExecutorService; 55 import java.util.concurrent.Executors; 56 import java.util.concurrent.Future; 57 import java.util.concurrent.LinkedBlockingDeque; 58 import java.util.concurrent.LinkedBlockingQueue; 59 import java.util.concurrent.LinkedTransferQueue; 60 import java.util.concurrent.atomic.AtomicInteger; 61 import jdk.test.lib.Utils; 62 63 public class ConcurrentQueueLoops { 64 static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); 65 ExecutorService pool; 66 AtomicInteger totalItems; 67 boolean print; 68 69 // Suitable for benchmarking. Overridden by args[0] for testing. 70 int maxStages = 20; 71 72 // Suitable for benchmarking. Overridden by args[1] for testing. 73 int items = 1024 * 1024; 74 75 Collection<Queue<Integer>> concurrentQueues() { 76 List<Queue<Integer>> queues = new ArrayList<>(); 77 queues.add(new ConcurrentLinkedDeque<Integer>()); 78 queues.add(new ConcurrentLinkedQueue<Integer>()); 79 queues.add(new ArrayBlockingQueue<Integer>(items, false)); 80 //queues.add(new ArrayBlockingQueue<Integer>(count, true)); 81 queues.add(new LinkedBlockingQueue<Integer>()); 82 queues.add(new LinkedBlockingDeque<Integer>()); 83 queues.add(new LinkedTransferQueue<Integer>()); 84 85 // Following additional implementations are available from: 86 // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html 87 // queues.add(new SynchronizedLinkedListQueue<Integer>()); 88 89 // Avoid "first fast, second slow" benchmark effect. 90 Collections.shuffle(queues); 91 return queues; 92 } 93 94 void test(String[] args) throws Throwable { 95 if (args.length > 0) 96 maxStages = Integer.parseInt(args[0]); 97 if (args.length > 1) 98 items = Integer.parseInt(args[1]); 99 100 for (Queue<Integer> queue : concurrentQueues()) 101 test(queue); 102 } 103 104 void test(final Queue<Integer> q) throws Throwable { 105 System.out.println(q.getClass().getSimpleName()); 106 pool = Executors.newCachedThreadPool(); 107 print = false; 108 109 print = false; 110 System.out.println("Warmup..."); 111 oneRun(1, items, q); 112 oneRun(3, items, q); 113 print = true; 114 115 for (int i = 1; i <= maxStages; i += (i+1) >>> 1) { 116 oneRun(i, items, q); 117 } 118 pool.shutdown(); 119 check(pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 120 } 121 122 class Stage implements Callable<Integer> { 123 final Queue<Integer> queue; 124 final CyclicBarrier barrier; 125 int items; 126 Stage(Queue<Integer> q, CyclicBarrier b, int items) { 127 queue = q; 128 barrier = b; 129 this.items = items; 130 } 131 132 public Integer call() { 133 // Repeatedly take something from queue if possible, 134 // transform it, and put back in. 135 try { 136 barrier.await(); 137 int l = 4321; 138 int takes = 0; 139 for (;;) { 140 Integer item = queue.poll(); 141 if (item != null) { 142 ++takes; 143 l = LoopHelpers.compute2(item.intValue()); 144 } 145 else if (takes != 0) { 146 totalItems.getAndAdd(-takes); 147 takes = 0; 148 } 149 else if (totalItems.get() <= 0) 150 break; 151 l = LoopHelpers.compute1(l); 152 if (items > 0) { 153 --items; 154 queue.offer(new Integer(l)); 155 } 156 else if ( (l & (3 << 5)) == 0) // spinwait 157 Thread.sleep(1); 158 } 159 return new Integer(l); 160 } 161 catch (Throwable t) { unexpected(t); return null; } 162 } 163 } 164 165 void oneRun(int n, int items, final Queue<Integer> q) throws Exception { 166 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); 167 CyclicBarrier barrier = new CyclicBarrier(n + 1, timer); 168 totalItems = new AtomicInteger(n * items); 169 ArrayList<Future<Integer>> results = new ArrayList<>(n); 170 for (int i = 0; i < n; ++i) 171 results.add(pool.submit(new Stage(q, barrier, items))); 172 173 if (print) 174 System.out.print("Threads: " + n + "\t:"); 175 barrier.await(); 176 int total = 0; 177 for (int i = 0; i < n; ++i) { 178 Future<Integer> f = results.get(i); 179 Integer r = f.get(); 180 total += r.intValue(); 181 } 182 long endTime = System.nanoTime(); 183 long time = endTime - timer.startTime; 184 if (print) 185 System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item"); 186 if (total == 0) // avoid overoptimization 187 System.out.println("useless result: " + total); 188 } 189 190 //--------------------- Infrastructure --------------------------- 191 volatile int passed = 0, failed = 0; 192 void pass() {passed++;} 193 void fail() {failed++; Thread.dumpStack();} 194 void fail(String msg) {System.err.println(msg); fail();} 195 void unexpected(Throwable t) {failed++; t.printStackTrace();} 196 void check(boolean cond) {if (cond) pass(); else fail();} 197 void equal(Object x, Object y) { 198 if (x == null ? y == null : x.equals(y)) pass(); 199 else fail(x + " not equal to " + y);} 200 public static void main(String[] args) throws Throwable { 201 new ConcurrentQueueLoops().instanceMain(args);} 202 public void instanceMain(String[] args) throws Throwable { 203 try {test(args);} catch (Throwable t) {unexpected(t);} 204 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 205 if (failed > 0) throw new AssertionError("Some tests failed");} 206 }