1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 /* 35 * @test 36 * @bug 8005696 37 * @summary Basic tests for CompletableFuture 38 * @library /test/lib 39 * @run main Basic 40 * @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 Basic 41 * @author Chris Hegarty 42 */ 43 44 import static java.util.concurrent.CompletableFuture.runAsync; 45 import static java.util.concurrent.CompletableFuture.supplyAsync; 46 import static java.util.concurrent.ForkJoinPool.commonPool; 47 import static java.util.concurrent.TimeUnit.MILLISECONDS; 48 import static java.util.concurrent.TimeUnit.SECONDS; 49 50 import java.lang.reflect.Array; 51 import java.util.concurrent.Phaser; 52 import java.util.concurrent.CompletableFuture; 53 import java.util.concurrent.CompletionException; 54 import java.util.concurrent.CancellationException; 55 import java.util.concurrent.ExecutionException; 56 import java.util.concurrent.ExecutorService; 57 import java.util.concurrent.Executors; 58 import java.util.concurrent.atomic.AtomicInteger; 59 import jdk.test.lib.Utils; 60 61 public class Basic { 62 static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); 63 64 static void checkCompletedNormally(CompletableFuture<?> cf, Object value) { 65 checkCompletedNormally(cf, value == null ? null : new Object[] { value }); 66 } 67 68 static void checkCompletedNormally(CompletableFuture<?> cf, Object[] values) { 69 try { equalAnyOf(cf.join(), values); } catch (Throwable x) { unexpected(x); } 70 try { equalAnyOf(cf.getNow(null), values); } catch (Throwable x) { unexpected(x); } 71 try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); } 72 try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); } 73 check(cf.isDone(), "Expected isDone to be true, got:" + cf); 74 check(!cf.isCompletedExceptionally(), "Expected isCompletedExceptionally to return false"); 75 check(!cf.isCancelled(), "Expected isCancelled to be false"); 76 check(!cf.cancel(true), "Expected cancel to return false"); 77 check(cf.toString().matches(".*\\[.*Completed normally.*\\]")); 78 check(cf.complete(null) == false, "Expected complete() to fail"); 79 check(cf.completeExceptionally(new Throwable()) == false, 80 "Expected completeExceptionally() to fail"); 81 } 82 83 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf) 84 throws Exception 85 { 86 checkCompletedExceptionally(cf, false); 87 } 88 89 @SuppressWarnings("unchecked") 90 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf, boolean cancelled) 91 throws Exception 92 { 93 try { cf.join(); fail("Excepted exception to be thrown"); } 94 catch (CompletionException x) { if (cancelled) fail(); else pass(); } 95 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 96 try { cf.getNow(null); fail("Excepted exception to be thrown"); } 97 catch (CompletionException x) { if (cancelled) fail(); else pass(); } 98 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 99 try { cf.get(); fail("Excepted exception to be thrown");} 100 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 101 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } 102 try { cf.get(0L, SECONDS); fail("Excepted exception to be thrown");} 103 catch (CancellationException x) { if (cancelled) pass(); else fail(); } 104 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); } 105 check(cf.isDone(), "Expected isDone to be true, got:" + cf); 106 check(cf.isCompletedExceptionally(), "Expected isCompletedExceptionally"); 107 check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled()); 108 check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true)); 109 check(cf.toString().matches(".*\\[.*Completed exceptionally.*\\]")); // ## TODO: 'E'xceptionally 110 check(cf.complete((T)new Object()) == false, "Expected complete() to fail"); 111 check(cf.completeExceptionally(new Throwable()) == false, 112 "Expected completeExceptionally() to fail, already completed"); 113 } 114 115 private static void realMain(String[] args) throws Throwable { 116 ExecutorService pool = Executors.newFixedThreadPool(2); 117 try { 118 test(pool); 119 } finally { 120 pool.shutdown(); 121 if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS)) 122 throw new Error(); 123 } 124 } 125 126 static AtomicInteger atomicInt = new AtomicInteger(0); 127 128 private static void test(ExecutorService executor) throws Throwable { 129 130 Thread.currentThread().setName("mainThread"); 131 132 //---------------------------------------------------------------- 133 // supplyAsync tests 134 //---------------------------------------------------------------- 135 try { 136 CompletableFuture<String> cf = supplyAsync(() -> "a test string"); 137 checkCompletedNormally(cf, cf.join()); 138 cf = supplyAsync(() -> "a test string", commonPool()); 139 checkCompletedNormally(cf, cf.join()); 140 cf = supplyAsync(() -> "a test string", executor); 141 checkCompletedNormally(cf, cf.join()); 142 cf = supplyAsync(() -> { throw new RuntimeException(); }); 143 checkCompletedExceptionally(cf); 144 cf = supplyAsync(() -> { throw new RuntimeException(); }, commonPool()); 145 checkCompletedExceptionally(cf); 146 cf = supplyAsync(() -> { throw new RuntimeException(); }, executor); 147 checkCompletedExceptionally(cf); 148 } catch (Throwable t) { unexpected(t); } 149 150 //---------------------------------------------------------------- 151 // runAsync tests 152 //---------------------------------------------------------------- 153 try { 154 CompletableFuture<Void> cf = runAsync(() -> {}); 155 checkCompletedNormally(cf, cf.join()); 156 cf = runAsync(() -> {}, commonPool()); 157 checkCompletedNormally(cf, cf.join()); 158 cf = runAsync(() -> {}, executor); 159 checkCompletedNormally(cf, cf.join()); 160 cf = runAsync(() -> { throw new RuntimeException(); }); 161 checkCompletedExceptionally(cf); 162 cf = runAsync(() -> { throw new RuntimeException(); }, commonPool()); 163 checkCompletedExceptionally(cf); 164 cf = runAsync(() -> { throw new RuntimeException(); }, executor); 165 checkCompletedExceptionally(cf); 166 } catch (Throwable t) { unexpected(t); } 167 168 //---------------------------------------------------------------- 169 // explicit completion 170 //---------------------------------------------------------------- 171 try { 172 final Phaser phaser = new Phaser(1); 173 final int phase = phaser.getPhase(); 174 CompletableFuture<Integer> cf; 175 cf = supplyAsync(() -> { phaser.awaitAdvance(phase); return 1; }); 176 cf.complete(2); 177 phaser.arrive(); 178 checkCompletedNormally(cf, 2); 179 180 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+1); return 1; }); 181 cf.completeExceptionally(new Throwable()); 182 phaser.arrive(); 183 checkCompletedExceptionally(cf); 184 185 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+2); return 1; }); 186 cf.cancel(true); 187 phaser.arrive(); 188 checkCompletedExceptionally(cf, true); 189 190 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+3); return 1; }); 191 check(cf.getNow(2) == 2); 192 phaser.arrive(); 193 checkCompletedNormally(cf, 1); 194 check(cf.getNow(2) == 1); 195 } catch (Throwable t) { unexpected(t); } 196 197 //---------------------------------------------------------------- 198 // thenApplyXXX tests 199 //---------------------------------------------------------------- 200 try { 201 CompletableFuture<Integer> cf2; 202 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 203 cf2 = cf1.thenApply(x -> x.equals("a test string") ? 1 : 0); 204 checkCompletedNormally(cf1, "a test string"); 205 checkCompletedNormally(cf2, 1); 206 207 cf1 = supplyAsync(() -> "a test string"); 208 cf2 = cf1.thenApplyAsync(x -> x.equals("a test string") ? 1 : 0); 209 checkCompletedNormally(cf1, "a test string"); 210 checkCompletedNormally(cf2, 1); 211 212 cf1 = supplyAsync(() -> "a test string"); 213 cf2 = cf1.thenApplyAsync(x -> x.equals("a test string") ? 1 : 0, executor); 214 checkCompletedNormally(cf1, "a test string"); 215 checkCompletedNormally(cf2, 1); 216 217 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 218 cf2 = cf1.thenApply(x -> 0); 219 checkCompletedExceptionally(cf1); 220 checkCompletedExceptionally(cf2); 221 222 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 223 cf2 = cf1.thenApplyAsync(x -> 0); 224 checkCompletedExceptionally(cf1); 225 checkCompletedExceptionally(cf2); 226 227 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 228 cf2 = cf1.thenApplyAsync(x -> 0, executor); 229 checkCompletedExceptionally(cf1); 230 checkCompletedExceptionally(cf2); 231 } catch (Throwable t) { unexpected(t); } 232 233 //---------------------------------------------------------------- 234 // thenAcceptXXX tests 235 //---------------------------------------------------------------- 236 try { 237 CompletableFuture<Void> cf2; 238 int before = atomicInt.get(); 239 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 240 cf2 = cf1.thenAccept(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); 241 checkCompletedNormally(cf1, "a test string"); 242 checkCompletedNormally(cf2, null); 243 check(atomicInt.get() == (before + 1)); 244 245 before = atomicInt.get(); 246 cf1 = supplyAsync(() -> "a test string"); 247 cf2 = cf1.thenAcceptAsync(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }); 248 checkCompletedNormally(cf1, "a test string"); 249 checkCompletedNormally(cf2, null); 250 check(atomicInt.get() == (before + 1)); 251 252 before = atomicInt.get(); 253 cf1 = supplyAsync(() -> "a test string"); 254 cf2 = cf1.thenAcceptAsync(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }, executor); 255 checkCompletedNormally(cf1, "a test string"); 256 checkCompletedNormally(cf2, null); 257 check(atomicInt.get() == (before + 1)); 258 259 before = atomicInt.get(); 260 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 261 cf2 = cf1.thenAccept(x -> atomicInt.incrementAndGet()); 262 checkCompletedExceptionally(cf1); 263 checkCompletedExceptionally(cf2); 264 check(atomicInt.get() == before); 265 266 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 267 cf2 = cf1.thenAcceptAsync(x -> atomicInt.incrementAndGet()); 268 checkCompletedExceptionally(cf1); 269 checkCompletedExceptionally(cf2); 270 check(atomicInt.get() == before); 271 272 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 273 cf2 = cf1.thenAcceptAsync(x -> atomicInt.incrementAndGet(), executor ); 274 checkCompletedExceptionally(cf1); 275 checkCompletedExceptionally(cf2); 276 check(atomicInt.get() == before); 277 } catch (Throwable t) { unexpected(t); } 278 279 //---------------------------------------------------------------- 280 // thenRunXXX tests 281 //---------------------------------------------------------------- 282 try { 283 CompletableFuture<Void> cf2; 284 int before = atomicInt.get(); 285 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string"); 286 cf2 = cf1.thenRun(() -> atomicInt.incrementAndGet()); 287 checkCompletedNormally(cf1, "a test string"); 288 checkCompletedNormally(cf2, null); 289 check(atomicInt.get() == (before + 1)); 290 291 before = atomicInt.get(); 292 cf1 = supplyAsync(() -> "a test string"); 293 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet()); 294 checkCompletedNormally(cf1, "a test string"); 295 checkCompletedNormally(cf2, null); 296 check(atomicInt.get() == (before + 1)); 297 298 before = atomicInt.get(); 299 cf1 = supplyAsync(() -> "a test string"); 300 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet(), executor); 301 checkCompletedNormally(cf1, "a test string"); 302 checkCompletedNormally(cf2, null); 303 check(atomicInt.get() == (before + 1)); 304 305 before = atomicInt.get(); 306 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 307 cf2 = cf1.thenRun(() -> atomicInt.incrementAndGet()); 308 checkCompletedExceptionally(cf1); 309 checkCompletedExceptionally(cf2); 310 check(atomicInt.get() == before); 311 312 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 313 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet()); 314 checkCompletedExceptionally(cf1); 315 checkCompletedExceptionally(cf2); 316 check(atomicInt.get() == before); 317 318 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 319 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet(), executor); 320 checkCompletedExceptionally(cf1); 321 checkCompletedExceptionally(cf2); 322 check(atomicInt.get() == before); 323 } catch (Throwable t) { unexpected(t); } 324 325 //---------------------------------------------------------------- 326 // thenCombineXXX tests 327 //---------------------------------------------------------------- 328 try { 329 CompletableFuture<Integer> cf3; 330 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 331 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 332 cf3 = cf1.thenCombine(cf2, (x, y) -> x + y); 333 checkCompletedNormally(cf1, 1); 334 checkCompletedNormally(cf2, 1); 335 checkCompletedNormally(cf3, 2); 336 337 cf1 = supplyAsync(() -> 1); 338 cf2 = supplyAsync(() -> 1); 339 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> x + y); 340 checkCompletedNormally(cf1, 1); 341 checkCompletedNormally(cf2, 1); 342 checkCompletedNormally(cf3, 2); 343 344 cf1 = supplyAsync(() -> 1); 345 cf2 = supplyAsync(() -> 1); 346 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> x + y, executor); 347 checkCompletedNormally(cf1, 1); 348 checkCompletedNormally(cf2, 1); 349 checkCompletedNormally(cf3, 2); 350 351 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 352 cf2 = supplyAsync(() -> 1); 353 cf3 = cf1.thenCombine(cf2, (x, y) -> 0); 354 checkCompletedExceptionally(cf1); 355 checkCompletedNormally(cf2, 1); 356 checkCompletedExceptionally(cf3); 357 358 cf1 = supplyAsync(() -> 1); 359 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 360 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> 0); 361 checkCompletedNormally(cf1, 1); 362 checkCompletedExceptionally(cf2); 363 checkCompletedExceptionally(cf3); 364 365 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 366 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 367 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> 0, executor); 368 checkCompletedExceptionally(cf1); 369 checkCompletedExceptionally(cf2); 370 checkCompletedExceptionally(cf3); 371 } catch (Throwable t) { unexpected(t); } 372 373 //---------------------------------------------------------------- 374 // thenAcceptBothXXX tests 375 //---------------------------------------------------------------- 376 try { 377 CompletableFuture<Void> cf3; 378 int before = atomicInt.get(); 379 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 380 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 381 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); 382 checkCompletedNormally(cf1, 1); 383 checkCompletedNormally(cf2, 1); 384 checkCompletedNormally(cf3, null); 385 check(atomicInt.get() == (before + 1)); 386 387 before = atomicInt.get(); 388 cf1 = supplyAsync(() -> 1); 389 cf2 = supplyAsync(() -> 1); 390 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }); 391 checkCompletedNormally(cf1, 1); 392 checkCompletedNormally(cf2, 1); 393 checkCompletedNormally(cf3, null); 394 check(atomicInt.get() == (before + 1)); 395 396 before = atomicInt.get(); 397 cf1 = supplyAsync(() -> 1); 398 cf2 = supplyAsync(() -> 1); 399 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }, executor); 400 checkCompletedNormally(cf1, 1); 401 checkCompletedNormally(cf2, 1); 402 checkCompletedNormally(cf3, null); 403 check(atomicInt.get() == (before + 1)); 404 405 before = atomicInt.get(); 406 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 407 cf2 = supplyAsync(() -> 1); 408 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> atomicInt.incrementAndGet()); 409 checkCompletedExceptionally(cf1); 410 checkCompletedNormally(cf2, 1); 411 checkCompletedExceptionally(cf3); 412 check(atomicInt.get() == before); 413 414 cf1 = supplyAsync(() -> 1); 415 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 416 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> atomicInt.incrementAndGet()); 417 checkCompletedNormally(cf1, 1); 418 checkCompletedExceptionally(cf2); 419 checkCompletedExceptionally(cf3); 420 check(atomicInt.get() == before); 421 422 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 423 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 424 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> atomicInt.incrementAndGet(), executor); 425 checkCompletedExceptionally(cf1); 426 checkCompletedExceptionally(cf2); 427 checkCompletedExceptionally(cf3); 428 check(atomicInt.get() == before); 429 } catch (Throwable t) { unexpected(t); } 430 431 //---------------------------------------------------------------- 432 // runAfterBothXXX tests 433 //---------------------------------------------------------------- 434 try { 435 CompletableFuture<Void> cf3; 436 int before = atomicInt.get(); 437 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 438 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1); 439 cf3 = cf1.runAfterBoth(cf2, () -> { check(cf1.isDone()); check(cf2.isDone()); atomicInt.incrementAndGet(); }); 440 checkCompletedNormally(cf1, 1); 441 checkCompletedNormally(cf2, 1); 442 checkCompletedNormally(cf3, null); 443 check(atomicInt.get() == (before + 1)); 444 445 before = atomicInt.get(); 446 CompletableFuture<Integer> cfa = supplyAsync(() -> 1); 447 CompletableFuture<Integer> cfb = supplyAsync(() -> 1); 448 cf3 = cfa.runAfterBothAsync(cfb, () -> { check(cfa.isDone()); check(cfb.isDone()); atomicInt.incrementAndGet(); }); 449 checkCompletedNormally(cfa, 1); 450 checkCompletedNormally(cfb, 1); 451 checkCompletedNormally(cf3, null); 452 check(atomicInt.get() == (before + 1)); 453 454 before = atomicInt.get(); 455 CompletableFuture<Integer> cfx = supplyAsync(() -> 1); 456 CompletableFuture<Integer> cfy = supplyAsync(() -> 1); 457 cf3 = cfy.runAfterBothAsync(cfx, () -> { check(cfx.isDone()); check(cfy.isDone()); atomicInt.incrementAndGet(); }, executor); 458 checkCompletedNormally(cfx, 1); 459 checkCompletedNormally(cfy, 1); 460 checkCompletedNormally(cf3, null); 461 check(atomicInt.get() == (before + 1)); 462 463 before = atomicInt.get(); 464 CompletableFuture<Integer> cf4 = supplyAsync(() -> { throw new RuntimeException(); }); 465 CompletableFuture<Integer> cf5 = supplyAsync(() -> 1); 466 cf3 = cf5.runAfterBothAsync(cf4, () -> atomicInt.incrementAndGet(), executor); 467 checkCompletedExceptionally(cf4); 468 checkCompletedNormally(cf5, 1); 469 checkCompletedExceptionally(cf3); 470 check(atomicInt.get() == before); 471 472 before = atomicInt.get(); 473 cf4 = supplyAsync(() -> 1); 474 cf5 = supplyAsync(() -> { throw new RuntimeException(); }); 475 cf3 = cf5.runAfterBothAsync(cf4, () -> atomicInt.incrementAndGet()); 476 checkCompletedNormally(cf4, 1); 477 checkCompletedExceptionally(cf5); 478 checkCompletedExceptionally(cf3); 479 check(atomicInt.get() == before); 480 481 before = atomicInt.get(); 482 cf4 = supplyAsync(() -> { throw new RuntimeException(); }); 483 cf5 = supplyAsync(() -> { throw new RuntimeException(); }); 484 cf3 = cf5.runAfterBoth(cf4, () -> atomicInt.incrementAndGet()); 485 checkCompletedExceptionally(cf4); 486 checkCompletedExceptionally(cf5); 487 checkCompletedExceptionally(cf3); 488 check(atomicInt.get() == before); 489 } catch (Throwable t) { unexpected(t); } 490 491 //---------------------------------------------------------------- 492 // applyToEitherXXX tests 493 //---------------------------------------------------------------- 494 try { 495 CompletableFuture<Integer> cf3; 496 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 497 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2); 498 cf3 = cf1.applyToEither(cf2, x -> { check(x == 1 || x == 2); return x; }); 499 checkCompletedNormally(cf3, new Object[] {1, 2}); 500 check(cf1.isDone() || cf2.isDone()); 501 502 cf1 = supplyAsync(() -> 1); 503 cf2 = supplyAsync(() -> 2); 504 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1 || x == 2); return x; }); 505 checkCompletedNormally(cf3, new Object[] {1, 2}); 506 check(cf1.isDone() || cf2.isDone()); 507 508 cf1 = supplyAsync(() -> 1); 509 cf2 = supplyAsync(() -> 2); 510 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1 || x == 2); return x; }, executor); 511 checkCompletedNormally(cf3, new Object[] {1, 2}); 512 check(cf1.isDone() || cf2.isDone()); 513 514 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 515 cf2 = supplyAsync(() -> 2); 516 cf3 = cf1.applyToEither(cf2, x -> { check(x == 2); return x; }); 517 try { check(cf3.join() == 2); } catch (CompletionException x) { pass(); } 518 check(cf3.isDone()); 519 check(cf1.isDone() || cf2.isDone()); 520 521 cf1 = supplyAsync(() -> 1); 522 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 523 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1); return x; }); 524 try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); } 525 check(cf3.isDone()); 526 check(cf1.isDone() || cf2.isDone()); 527 528 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 529 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 530 cf3 = cf1.applyToEitherAsync(cf2, x -> { fail(); return x; }); 531 checkCompletedExceptionally(cf3); 532 check(cf1.isDone() || cf2.isDone()); 533 534 final Phaser cf3Done = new Phaser(2); 535 cf1 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 1; }); 536 cf2 = supplyAsync(() -> 2); 537 cf3 = cf1.applyToEither(cf2, x -> { check(x == 2); return x; }); 538 checkCompletedNormally(cf3, 2); 539 checkCompletedNormally(cf2, 2); 540 check(!cf1.isDone()); 541 cf3Done.arrive(); 542 checkCompletedNormally(cf1, 1); 543 checkCompletedNormally(cf3, 2); 544 545 cf1 = supplyAsync(() -> 1); 546 cf2 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 2; }); 547 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1); return x; }); 548 checkCompletedNormally(cf3, 1); 549 checkCompletedNormally(cf1, 1); 550 check(!cf2.isDone()); 551 cf3Done.arrive(); 552 checkCompletedNormally(cf2, 2); 553 checkCompletedNormally(cf3, 1); 554 } catch (Throwable t) { unexpected(t); } 555 556 //---------------------------------------------------------------- 557 // acceptEitherXXX tests 558 //---------------------------------------------------------------- 559 try { 560 CompletableFuture<Void> cf3; 561 int before = atomicInt.get(); 562 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 563 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2); 564 cf3 = cf1.acceptEither(cf2, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); 565 checkCompletedNormally(cf3, null); 566 check(cf1.isDone() || cf2.isDone()); 567 check(atomicInt.get() == (before + 1)); 568 569 before = atomicInt.get(); 570 cf1 = supplyAsync(() -> 1); 571 cf2 = supplyAsync(() -> 2); 572 cf3 = cf1.acceptEitherAsync(cf2, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }); 573 checkCompletedNormally(cf3, null); 574 check(cf1.isDone() || cf2.isDone()); 575 check(atomicInt.get() == (before + 1)); 576 577 before = atomicInt.get(); 578 cf1 = supplyAsync(() -> 1); 579 cf2 = supplyAsync(() -> 2); 580 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }, executor); 581 checkCompletedNormally(cf3, null); 582 check(cf1.isDone() || cf2.isDone()); 583 check(atomicInt.get() == (before + 1)); 584 585 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 586 cf2 = supplyAsync(() -> 2); 587 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 2); }, executor); 588 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 589 check(cf3.isDone()); 590 check(cf1.isDone() || cf2.isDone()); 591 592 cf1 = supplyAsync(() -> 1); 593 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 594 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 1); }); 595 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); } 596 check(cf3.isDone()); 597 check(cf1.isDone() || cf2.isDone()); 598 599 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 600 cf2 = supplyAsync(() -> { throw new RuntimeException(); }); 601 cf3 = cf2.acceptEitherAsync(cf1, x -> { fail(); }); 602 checkCompletedExceptionally(cf3); 603 check(cf1.isDone() || cf2.isDone()); 604 605 final Phaser cf3Done = new Phaser(2); 606 cf1 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 1; }); 607 cf2 = supplyAsync(() -> 2); 608 cf3 = cf1.acceptEither(cf2, x -> { check(x == 2); }); 609 checkCompletedNormally(cf3, null); 610 checkCompletedNormally(cf2, 2); 611 check(!cf1.isDone()); 612 cf3Done.arrive(); 613 checkCompletedNormally(cf1, 1); 614 checkCompletedNormally(cf3, null); 615 616 cf1 = supplyAsync(() -> 1); 617 cf2 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 2; }); 618 cf3 = cf1.acceptEitherAsync(cf2, x -> { check(x == 1); }); 619 checkCompletedNormally(cf3, null); 620 checkCompletedNormally(cf1, 1); 621 check(!cf2.isDone()); 622 cf3Done.arrive(); 623 checkCompletedNormally(cf2, 2); 624 checkCompletedNormally(cf3, null); 625 } catch (Throwable t) { unexpected(t); } 626 627 //---------------------------------------------------------------- 628 // runAfterEitherXXX tests 629 //---------------------------------------------------------------- 630 try { 631 CompletableFuture<Void> cf3; 632 int before = atomicInt.get(); 633 CompletableFuture<Void> cf1 = runAsync(() -> {}); 634 CompletableFuture<Void> cf2 = runAsync(() -> {}); 635 cf3 = cf1.runAfterEither(cf2, () -> atomicInt.incrementAndGet()); 636 checkCompletedNormally(cf3, null); 637 check(cf1.isDone() || cf2.isDone()); 638 check(atomicInt.get() == (before + 1)); 639 640 before = atomicInt.get(); 641 cf1 = runAsync(() -> {}); 642 cf2 = runAsync(() -> {}); 643 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet()); 644 checkCompletedNormally(cf3, null); 645 check(cf1.isDone() || cf2.isDone()); 646 check(atomicInt.get() == (before + 1)); 647 648 before = atomicInt.get(); 649 cf1 = runAsync(() -> {}); 650 cf2 = runAsync(() -> {}); 651 cf3 = cf2.runAfterEitherAsync(cf1, () -> atomicInt.incrementAndGet(), executor); 652 checkCompletedNormally(cf3, null); 653 check(cf1.isDone() || cf2.isDone()); 654 check(atomicInt.get() == (before + 1)); 655 656 before = atomicInt.get(); 657 cf1 = runAsync(() -> { throw new RuntimeException(); }); 658 cf2 = runAsync(() -> {}); 659 cf3 = cf2.runAfterEither(cf1, () -> atomicInt.incrementAndGet()); 660 try { 661 check(cf3.join() == null); 662 check(atomicInt.get() == (before + 1)); 663 } catch (CompletionException x) { pass(); } 664 check(cf3.isDone()); 665 check(cf1.isDone() || cf2.isDone()); 666 667 before = atomicInt.get(); 668 cf1 = runAsync(() -> {}); 669 cf2 = runAsync(() -> { throw new RuntimeException(); }); 670 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet()); 671 try { 672 check(cf3.join() == null); 673 check(atomicInt.get() == (before + 1)); 674 } catch (CompletionException x) { pass(); } 675 check(cf3.isDone()); 676 check(cf1.isDone() || cf2.isDone()); 677 678 before = atomicInt.get(); 679 cf1 = runAsync(() -> { throw new RuntimeException(); }); 680 cf2 = runAsync(() -> { throw new RuntimeException(); }); 681 cf3 = cf2.runAfterEitherAsync(cf1, () -> atomicInt.incrementAndGet(), executor); 682 checkCompletedExceptionally(cf3); 683 check(cf1.isDone() || cf2.isDone()); 684 check(atomicInt.get() == before); 685 686 final Phaser cf3Done = new Phaser(2); 687 before = atomicInt.get(); 688 cf1 = runAsync(() -> cf3Done.arriveAndAwaitAdvance()); 689 cf2 = runAsync(() -> {}); 690 cf3 = cf1.runAfterEither(cf2, () -> atomicInt.incrementAndGet()); 691 checkCompletedNormally(cf3, null); 692 checkCompletedNormally(cf2, null); 693 check(!cf1.isDone()); 694 check(atomicInt.get() == (before + 1)); 695 cf3Done.arrive(); 696 checkCompletedNormally(cf1, null); 697 checkCompletedNormally(cf3, null); 698 699 before = atomicInt.get(); 700 cf1 = runAsync(() -> {}); 701 cf2 = runAsync(() -> cf3Done.arriveAndAwaitAdvance()); 702 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet()); 703 checkCompletedNormally(cf3, null); 704 checkCompletedNormally(cf1, null); 705 check(!cf2.isDone()); 706 check(atomicInt.get() == (before + 1)); 707 cf3Done.arrive(); 708 checkCompletedNormally(cf2, null); 709 checkCompletedNormally(cf3, null); 710 } catch (Throwable t) { unexpected(t); } 711 712 //---------------------------------------------------------------- 713 // thenComposeXXX tests 714 //---------------------------------------------------------------- 715 try { 716 CompletableFuture<Integer> cf2; 717 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 718 cf2 = cf1.thenCompose(x -> { check(x == 1); return CompletableFuture.completedFuture(2); }); 719 checkCompletedNormally(cf1, 1); 720 checkCompletedNormally(cf2, 2); 721 722 cf1 = supplyAsync(() -> 1); 723 cf2 = cf1.thenComposeAsync(x -> { check(x == 1); return CompletableFuture.completedFuture(2); }); 724 checkCompletedNormally(cf1, 1); 725 checkCompletedNormally(cf2, 2); 726 727 cf1 = supplyAsync(() -> 1); 728 cf2 = cf1.thenComposeAsync(x -> { check(x == 1); return CompletableFuture.completedFuture(2); }, executor); 729 checkCompletedNormally(cf1, 1); 730 checkCompletedNormally(cf2, 2); 731 732 int before = atomicInt.get(); 733 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 734 cf2 = cf1.thenCompose(x -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); }); 735 checkCompletedExceptionally(cf1); 736 checkCompletedExceptionally(cf2); 737 check(atomicInt.get() == before); 738 739 cf1 = supplyAsync(() -> { throw new RuntimeException(); }); 740 cf2 = cf1.thenComposeAsync(x -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); }); 741 checkCompletedExceptionally(cf1); 742 checkCompletedExceptionally(cf2); 743 check(atomicInt.get() == before); 744 745 cf1 = supplyAsync(() -> 1); 746 cf2 = cf1.thenComposeAsync(x -> { throw new RuntimeException(); }, executor); 747 checkCompletedNormally(cf1, 1); 748 checkCompletedExceptionally(cf2); 749 } catch (Throwable t) { unexpected(t); } 750 751 //---------------------------------------------------------------- 752 // anyOf tests 753 //---------------------------------------------------------------- 754 try { 755 CompletableFuture<Object> cf3; 756 for (int k=0; k < 10; k++){ 757 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 758 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2); 759 cf3 = CompletableFuture.anyOf(cf1, cf2); 760 checkCompletedNormally(cf3, new Object[] {1, 2}); 761 check(cf1.isDone() || cf2.isDone()); 762 } 763 } catch (Throwable t) { unexpected(t); } 764 765 //---------------------------------------------------------------- 766 // allOf tests 767 //---------------------------------------------------------------- 768 try { 769 CompletableFuture<?> cf3; 770 for (int k=0; k < 10; k++){ 771 CompletableFuture<Integer>[] cfs = (CompletableFuture<Integer>[]) 772 Array.newInstance(CompletableFuture.class, 10); 773 for (int j=0; j < 10; j++) { 774 final int v = j; 775 cfs[j] = supplyAsync(() -> v); 776 } 777 cf3 = CompletableFuture.allOf(cfs); 778 for (int j=0; j < 10; j++) 779 checkCompletedNormally(cfs[j], j); 780 checkCompletedNormally(cf3, null); 781 } 782 } catch (Throwable t) { unexpected(t); } 783 784 //---------------------------------------------------------------- 785 // exceptionally tests 786 //---------------------------------------------------------------- 787 try { 788 CompletableFuture<Integer> cf2; 789 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 790 cf2 = cf1.exceptionally(t -> { fail("function should never be called"); return 2;}); 791 checkCompletedNormally(cf1, 1); 792 checkCompletedNormally(cf2, 1); 793 794 final RuntimeException t = new RuntimeException(); 795 cf1 = supplyAsync(() -> { throw t; }); 796 cf2 = cf1.exceptionally(x -> { check(x.getCause() == t); return 2;}); 797 checkCompletedExceptionally(cf1); 798 checkCompletedNormally(cf2, 2); 799 } catch (Throwable t) { unexpected(t); } 800 801 //---------------------------------------------------------------- 802 // handle tests 803 //---------------------------------------------------------------- 804 try { 805 CompletableFuture<Integer> cf2; 806 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 807 cf2 = cf1.handle((x,t) -> x+1); 808 checkCompletedNormally(cf1, 1); 809 checkCompletedNormally(cf2, 2); 810 811 final RuntimeException ex = new RuntimeException(); 812 cf1 = supplyAsync(() -> { throw ex; }); 813 cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;}); 814 checkCompletedExceptionally(cf1); 815 checkCompletedNormally(cf2, 2); 816 817 cf1 = supplyAsync(() -> 1); 818 cf2 = cf1.handleAsync((x,t) -> x+1); 819 checkCompletedNormally(cf1, 1); 820 checkCompletedNormally(cf2, 2); 821 822 cf1 = supplyAsync(() -> { throw ex; }); 823 cf2 = cf1.handleAsync((x,t) -> { check(t.getCause() == ex); return 2;}); 824 checkCompletedExceptionally(cf1); 825 checkCompletedNormally(cf2, 2); 826 } catch (Throwable t) { unexpected(t); } 827 828 //---------------------------------------------------------------- 829 // whenComplete tests 830 //---------------------------------------------------------------- 831 try { 832 AtomicInteger count = new AtomicInteger(); 833 CompletableFuture<Integer> cf2; 834 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1); 835 cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement()); 836 checkCompletedNormally(cf1, 1); 837 checkCompletedNormally(cf2, 1); 838 check(count.get() == 1, "action count should be incremented"); 839 840 final RuntimeException ex = new RuntimeException(); 841 cf1 = supplyAsync(() -> { throw ex; }); 842 cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement()); 843 checkCompletedExceptionally(cf1); 844 checkCompletedExceptionally(cf2); 845 check(count.get() == 2, "action count should be incremented"); 846 847 cf1 = supplyAsync(() -> 1); 848 cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement()); 849 checkCompletedNormally(cf1, 1); 850 checkCompletedNormally(cf2, 1); 851 check(count.get() == 3, "action count should be incremented"); 852 853 cf1 = supplyAsync(() -> { throw ex; }); 854 cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement()); 855 checkCompletedExceptionally(cf1); 856 checkCompletedExceptionally(cf2); 857 check(count.get() == 4, "action count should be incremented"); 858 859 } catch (Throwable t) { unexpected(t); } 860 861 } 862 863 //--------------------- Infrastructure --------------------------- 864 static volatile int passed = 0, failed = 0; 865 static void pass() {passed++;} 866 static void fail() {failed++; Thread.dumpStack();} 867 static void fail(String msg) {System.out.println(msg); fail();} 868 static void unexpected(Throwable t) {failed++; t.printStackTrace();} 869 static void check(boolean cond) {if (cond) pass(); else fail();} 870 static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);} 871 static void equal(Object x, Object y) { 872 if (x == null ? y == null : x.equals(y)) pass(); 873 else fail(x + " not equal to " + y);} 874 static void equalAnyOf(Object x, Object[] y) { 875 if (x == null && y == null) { pass(); return; } 876 for (Object z : y) { if (x.equals(z)) { pass(); return; } } 877 StringBuilder sb = new StringBuilder(); 878 for (Object o : y) 879 sb.append(o).append(" "); 880 fail(x + " not equal to one of [" + sb + "]");} 881 public static void main(String[] args) throws Throwable { 882 try {realMain(args);} catch (Throwable t) {unexpected(t);} 883 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); 884 if (failed > 0) throw new AssertionError("Some tests failed");} 885 }