1 /* 2 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @bug 4607272 6842687 6878369 6944810 7023403 26 * @summary Unit test for AsynchronousSocketChannel 27 * @run main Basic -skipSlowConnectTest 28 */ 29 30 import java.nio.ByteBuffer; 31 import java.nio.channels.*; 32 import static java.net.StandardSocketOptions.*; 33 import java.net.*; 34 import java.util.Random; 35 import java.util.concurrent.*; 36 import java.util.concurrent.atomic.*; 37 import java.io.Closeable; 38 import java.io.IOException; 39 40 public class Basic { 41 static final Random rand = new Random(); 42 43 static boolean skipSlowConnectTest = false; 44 45 public static void main(String[] args) throws Exception { 46 for (String arg: args) { 47 switch (arg) { 48 case "-skipSlowConnectTest" : 49 skipSlowConnectTest = true; 50 break; 51 default: 52 throw new RuntimeException("Unrecognized argument: " + arg); 53 } 54 } 55 56 testBind(); 57 testSocketOptions(); 58 testConnect(); 59 testCloseWhenPending(); 60 testCancel(); 61 testRead1(); 62 testRead2(); 63 testRead3(); 64 testWrite1(); 65 testWrite2(); 66 // skip timeout tests until 7052549 is fixed 67 if (!System.getProperty("os.name").startsWith("Windows")) 68 testTimeout(); 69 testShutdown(); 70 } 71 72 static class Server implements Closeable { 73 private final ServerSocketChannel ssc; 74 private final InetSocketAddress address; 75 76 Server() throws IOException { 77 ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 78 79 InetAddress lh = InetAddress.getLocalHost(); 80 int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 81 address = new InetSocketAddress(lh, port); 82 } 83 84 InetSocketAddress address() { 85 return address; 86 } 87 88 SocketChannel accept() throws IOException { 89 return ssc.accept(); 90 } 91 92 public void close() throws IOException { 93 ssc.close(); 94 } 95 96 } 97 98 static void testBind() throws Exception { 99 System.out.println("-- bind --"); 100 101 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 102 if (ch.getLocalAddress() != null) 103 throw new RuntimeException("Local address should be 'null'"); 104 ch.bind(new InetSocketAddress(0)); 105 106 // check local address after binding 107 InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 108 if (local.getPort() == 0) 109 throw new RuntimeException("Unexpected port"); 110 if (!local.getAddress().isAnyLocalAddress()) 111 throw new RuntimeException("Not bound to a wildcard address"); 112 113 // try to re-bind 114 try { 115 ch.bind(new InetSocketAddress(0)); 116 throw new RuntimeException("AlreadyBoundException expected"); 117 } catch (AlreadyBoundException x) { 118 } 119 } 120 121 // check ClosedChannelException 122 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 123 ch.close(); 124 try { 125 ch.bind(new InetSocketAddress(0)); 126 throw new RuntimeException("ClosedChannelException expected"); 127 } catch (ClosedChannelException x) { 128 } 129 } 130 131 static void testSocketOptions() throws Exception { 132 System.out.println("-- socket options --"); 133 134 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 135 ch.setOption(SO_RCVBUF, 128*1024) 136 .setOption(SO_SNDBUF, 128*1024) 137 .setOption(SO_REUSEADDR, true); 138 139 // check SO_SNDBUF/SO_RCVBUF limits 140 int before, after; 141 before = ch.getOption(SO_SNDBUF); 142 after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 143 if (after < before) 144 throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 145 before = ch.getOption(SO_RCVBUF); 146 after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 147 if (after < before) 148 throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 149 150 ch.bind(new InetSocketAddress(0)); 151 152 // default values 153 if (ch.getOption(SO_KEEPALIVE)) 154 throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 155 if (ch.getOption(TCP_NODELAY)) 156 throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 157 158 // set and check 159 if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 160 throw new RuntimeException("SO_KEEPALIVE did not change"); 161 if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 162 throw new RuntimeException("SO_KEEPALIVE did not change"); 163 164 // read others (can't check as actual value is implementation dependent) 165 ch.getOption(SO_RCVBUF); 166 ch.getOption(SO_SNDBUF); 167 } 168 } 169 170 static void testConnect() throws Exception { 171 System.out.println("-- connect --"); 172 173 SocketAddress address; 174 175 try (Server server = new Server()) { 176 address = server.address(); 177 178 // connect to server and check local/remote addresses 179 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 180 ch.connect(address).get(); 181 // check local address 182 if (ch.getLocalAddress() == null) 183 throw new RuntimeException("Not bound to local address"); 184 185 // check remote address 186 InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 187 if (remote.getPort() != server.address().getPort()) 188 throw new RuntimeException("Connected to unexpected port"); 189 if (!remote.getAddress().equals(server.address().getAddress())) 190 throw new RuntimeException("Connected to unexpected address"); 191 192 // try to connect again 193 try { 194 ch.connect(server.address()).get(); 195 throw new RuntimeException("AlreadyConnectedException expected"); 196 } catch (AlreadyConnectedException x) { 197 } 198 199 // clean-up 200 server.accept().close(); 201 } 202 203 // check that connect fails with ClosedChannelException 204 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 205 ch.close(); 206 try { 207 ch.connect(server.address()).get(); 208 throw new RuntimeException("ExecutionException expected"); 209 } catch (ExecutionException x) { 210 if (!(x.getCause() instanceof ClosedChannelException)) 211 throw new RuntimeException("Cause of ClosedChannelException expected"); 212 } 213 final AtomicReference<Throwable> connectException = new AtomicReference<>(); 214 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 215 public void completed(Void result, Void att) { 216 } 217 public void failed(Throwable exc, Void att) { 218 connectException.set(exc); 219 } 220 }); 221 while (connectException.get() == null) { 222 Thread.sleep(100); 223 } 224 if (!(connectException.get() instanceof ClosedChannelException)) 225 throw new RuntimeException("ClosedChannelException expected"); 226 } 227 228 // test that failure to connect closes the channel 229 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 230 try { 231 ch.connect(address).get(); 232 } catch (ExecutionException x) { 233 // failed to establish connection 234 if (ch.isOpen()) 235 throw new RuntimeException("Channel should be closed"); 236 } 237 } 238 239 // repeat test by connecting to a (probably) non-existent host. This 240 // improves the chance that the connect will not fail immediately. 241 if (!skipSlowConnectTest) { 242 try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 243 try { 244 ch.connect(genSocketAddress()).get(); 245 } catch (ExecutionException x) { 246 // failed to establish connection 247 if (ch.isOpen()) 248 throw new RuntimeException("Channel should be closed"); 249 } 250 } 251 } 252 } 253 254 static void testCloseWhenPending() throws Exception { 255 System.out.println("-- asynchronous close when connecting --"); 256 257 AsynchronousSocketChannel ch; 258 259 // asynchronous close while connecting 260 ch = AsynchronousSocketChannel.open(); 261 Future<Void> connectResult = ch.connect(genSocketAddress()); 262 263 // give time to initiate the connect (SYN) 264 Thread.sleep(50); 265 266 // close 267 ch.close(); 268 269 // check that exception is thrown in timely manner 270 try { 271 connectResult.get(5, TimeUnit.SECONDS); 272 } catch (TimeoutException x) { 273 throw new RuntimeException("AsynchronousCloseException not thrown"); 274 } catch (ExecutionException x) { 275 // expected 276 } 277 278 System.out.println("-- asynchronous close when reading --"); 279 280 try (Server server = new Server()) { 281 ch = AsynchronousSocketChannel.open(); 282 ch.connect(server.address()).get(); 283 284 ByteBuffer dst = ByteBuffer.allocateDirect(100); 285 Future<Integer> result = ch.read(dst); 286 287 // attempt a second read - should fail with ReadPendingException 288 ByteBuffer buf = ByteBuffer.allocateDirect(100); 289 try { 290 ch.read(buf); 291 throw new RuntimeException("ReadPendingException expected"); 292 } catch (ReadPendingException x) { 293 } 294 295 // close channel (should cause initial read to complete) 296 ch.close(); 297 server.accept().close(); 298 299 // check that AsynchronousCloseException is thrown 300 try { 301 result.get(); 302 throw new RuntimeException("Should not read"); 303 } catch (ExecutionException x) { 304 if (!(x.getCause() instanceof AsynchronousCloseException)) 305 throw new RuntimeException(x); 306 } 307 308 System.out.println("-- asynchronous close when writing --"); 309 310 ch = AsynchronousSocketChannel.open(); 311 ch.connect(server.address()).get(); 312 313 final AtomicReference<Throwable> writeException = 314 new AtomicReference<Throwable>(); 315 316 // write bytes to fill socket buffer 317 ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 318 public void completed(Integer result, AsynchronousSocketChannel ch) { 319 ch.write(genBuffer(), ch, this); 320 } 321 public void failed(Throwable x, AsynchronousSocketChannel ch) { 322 writeException.set(x); 323 } 324 }); 325 326 // give time for socket buffer to fill up. 327 Thread.sleep(5*1000); 328 329 // attempt a concurrent write - should fail with WritePendingException 330 try { 331 ch.write(genBuffer()); 332 throw new RuntimeException("WritePendingException expected"); 333 } catch (WritePendingException x) { 334 } 335 336 // close channel - should cause initial write to complete 337 ch.close(); 338 server.accept().close(); 339 340 // wait for exception 341 while (writeException.get() == null) { 342 Thread.sleep(100); 343 } 344 if (!(writeException.get() instanceof AsynchronousCloseException)) 345 throw new RuntimeException("AsynchronousCloseException expected"); 346 } 347 } 348 349 static void testCancel() throws Exception { 350 System.out.println("-- cancel --"); 351 352 try (Server server = new Server()) { 353 for (int i=0; i<2; i++) { 354 boolean mayInterruptIfRunning = (i == 0) ? false : true; 355 356 // establish loopback connection 357 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 358 ch.connect(server.address()).get(); 359 SocketChannel peer = server.accept(); 360 361 // start read operation 362 ByteBuffer buf = ByteBuffer.allocate(1); 363 Future<Integer> res = ch.read(buf); 364 365 // cancel operation 366 boolean cancelled = res.cancel(mayInterruptIfRunning); 367 368 // check post-conditions 369 if (!res.isDone()) 370 throw new RuntimeException("isDone should return true"); 371 if (res.isCancelled() != cancelled) 372 throw new RuntimeException("isCancelled not consistent"); 373 try { 374 res.get(); 375 throw new RuntimeException("CancellationException expected"); 376 } catch (CancellationException x) { 377 } 378 try { 379 res.get(1, TimeUnit.SECONDS); 380 throw new RuntimeException("CancellationException expected"); 381 } catch (CancellationException x) { 382 } 383 384 // check that the cancel doesn't impact writing to the channel 385 if (!mayInterruptIfRunning) { 386 buf = ByteBuffer.wrap("a".getBytes()); 387 ch.write(buf).get(); 388 } 389 390 ch.close(); 391 peer.close(); 392 } 393 } 394 } 395 396 static void testRead1() throws Exception { 397 System.out.println("-- read (1) --"); 398 399 try (Server server = new Server()) { 400 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 401 ch.connect(server.address()).get(); 402 403 // read with 0 bytes remaining should complete immediately 404 ByteBuffer buf = ByteBuffer.allocate(1); 405 buf.put((byte)0); 406 int n = ch.read(buf).get(); 407 if (n != 0) 408 throw new RuntimeException("0 expected"); 409 410 // write bytes and close connection 411 ByteBuffer src = genBuffer(); 412 try (SocketChannel sc = server.accept()) { 413 sc.setOption(SO_SNDBUF, src.remaining()); 414 while (src.hasRemaining()) 415 sc.write(src); 416 } 417 418 // reads should complete immediately 419 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 420 final CountDownLatch latch = new CountDownLatch(1); 421 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 422 public void completed(Integer result, Void att) { 423 int n = result; 424 if (n > 0) { 425 ch.read(dst, (Void)null, this); 426 } else { 427 latch.countDown(); 428 } 429 } 430 public void failed(Throwable exc, Void att) { 431 } 432 }); 433 434 latch.await(); 435 436 // check buffers 437 src.flip(); 438 dst.flip(); 439 if (!src.equals(dst)) { 440 throw new RuntimeException("Contents differ"); 441 } 442 443 // close channel 444 ch.close(); 445 446 // check read fails with ClosedChannelException 447 try { 448 ch.read(dst).get(); 449 throw new RuntimeException("ExecutionException expected"); 450 } catch (ExecutionException x) { 451 if (!(x.getCause() instanceof ClosedChannelException)) 452 throw new RuntimeException("Cause of ClosedChannelException expected"); 453 } 454 } 455 } 456 457 static void testRead2() throws Exception { 458 System.out.println("-- read (2) --"); 459 460 try (Server server = new Server()) { 461 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 462 ch.connect(server.address()).get(); 463 SocketChannel sc = server.accept(); 464 465 ByteBuffer src = genBuffer(); 466 467 // read until the buffer is full 468 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 469 final CountDownLatch latch = new CountDownLatch(1); 470 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 471 public void completed(Integer result, Void att) { 472 if (dst.hasRemaining()) { 473 ch.read(dst, (Void)null, this); 474 } else { 475 latch.countDown(); 476 } 477 } 478 public void failed(Throwable exc, Void att) { 479 } 480 }); 481 482 // trickle the writing 483 do { 484 int rem = src.remaining(); 485 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 486 ByteBuffer buf = ByteBuffer.allocate(size); 487 for (int i=0; i<size; i++) 488 buf.put(src.get()); 489 buf.flip(); 490 Thread.sleep(50 + rand.nextInt(1500)); 491 while (buf.hasRemaining()) 492 sc.write(buf); 493 } while (src.hasRemaining()); 494 495 // wait until ascynrhonous reading has completed 496 latch.await(); 497 498 // check buffers 499 src.flip(); 500 dst.flip(); 501 if (!src.equals(dst)) { 502 throw new RuntimeException("Contents differ"); 503 } 504 505 sc.close(); 506 ch.close(); 507 } 508 } 509 510 // exercise scattering read 511 static void testRead3() throws Exception { 512 System.out.println("-- read (3) --"); 513 514 try (Server server = new Server()) { 515 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 516 ch.connect(server.address()).get(); 517 SocketChannel sc = server.accept(); 518 519 ByteBuffer[] dsts = new ByteBuffer[3]; 520 for (int i=0; i<dsts.length; i++) { 521 dsts[i] = ByteBuffer.allocateDirect(100); 522 } 523 524 // scattering read that completes ascynhronously 525 final CountDownLatch l1 = new CountDownLatch(1); 526 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 527 new CompletionHandler<Long,Void>() { 528 public void completed(Long result, Void att) { 529 long n = result; 530 if (n <= 0) 531 throw new RuntimeException("No bytes read"); 532 l1.countDown(); 533 } 534 public void failed(Throwable exc, Void att) { 535 } 536 }); 537 538 // write some bytes 539 sc.write(genBuffer()); 540 541 // read should now complete 542 l1.await(); 543 544 // write more bytes 545 sc.write(genBuffer()); 546 547 // read should complete immediately 548 for (int i=0; i<dsts.length; i++) { 549 dsts[i].rewind(); 550 } 551 552 final CountDownLatch l2 = new CountDownLatch(1); 553 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 554 new CompletionHandler<Long,Void>() { 555 public void completed(Long result, Void att) { 556 long n = result; 557 if (n <= 0) 558 throw new RuntimeException("No bytes read"); 559 l2.countDown(); 560 } 561 public void failed(Throwable exc, Void att) { 562 } 563 }); 564 l2.await(); 565 566 ch.close(); 567 sc.close(); 568 } 569 } 570 571 static void testWrite1() throws Exception { 572 System.out.println("-- write (1) --"); 573 574 try (Server server = new Server()) { 575 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 576 ch.connect(server.address()).get(); 577 SocketChannel sc = server.accept(); 578 579 // write with 0 bytes remaining should complete immediately 580 ByteBuffer buf = ByteBuffer.allocate(1); 581 buf.put((byte)0); 582 int n = ch.write(buf).get(); 583 if (n != 0) 584 throw new RuntimeException("0 expected"); 585 586 // write all bytes and close connection when done 587 final ByteBuffer src = genBuffer(); 588 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 589 public void completed(Integer result, Void att) { 590 if (src.hasRemaining()) { 591 ch.write(src, (Void)null, this); 592 } else { 593 try { 594 ch.close(); 595 } catch (IOException ignore) { } 596 } 597 } 598 public void failed(Throwable exc, Void att) { 599 } 600 }); 601 602 // read to EOF or buffer full 603 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 604 do { 605 n = sc.read(dst); 606 } while (n > 0); 607 sc.close(); 608 609 // check buffers 610 src.flip(); 611 dst.flip(); 612 if (!src.equals(dst)) { 613 throw new RuntimeException("Contents differ"); 614 } 615 616 // check write fails with ClosedChannelException 617 try { 618 ch.read(dst).get(); 619 throw new RuntimeException("ExecutionException expected"); 620 } catch (ExecutionException x) { 621 if (!(x.getCause() instanceof ClosedChannelException)) 622 throw new RuntimeException("Cause of ClosedChannelException expected"); 623 } 624 } 625 } 626 627 // exercise gathering write 628 static void testWrite2() throws Exception { 629 System.out.println("-- write (2) --"); 630 631 try (Server server = new Server()) { 632 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 633 ch.connect(server.address()).get(); 634 SocketChannel sc = server.accept(); 635 636 // number of bytes written 637 final AtomicLong bytesWritten = new AtomicLong(0); 638 639 // write buffers (should complete immediately) 640 ByteBuffer[] srcs = genBuffers(1); 641 final CountDownLatch l1 = new CountDownLatch(1); 642 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 643 new CompletionHandler<Long,Void>() { 644 public void completed(Long result, Void att) { 645 long n = result; 646 if (n <= 0) 647 throw new RuntimeException("No bytes read"); 648 bytesWritten.addAndGet(n); 649 l1.countDown(); 650 } 651 public void failed(Throwable exc, Void att) { 652 } 653 }); 654 l1.await(); 655 656 // set to true to signal that no more buffers should be written 657 final AtomicBoolean continueWriting = new AtomicBoolean(true); 658 659 // write until socket buffer is full so as to create the conditions 660 // for when a write does not complete immediately 661 srcs = genBuffers(1); 662 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 663 new CompletionHandler<Long,Void>() { 664 public void completed(Long result, Void att) { 665 long n = result; 666 if (n <= 0) 667 throw new RuntimeException("No bytes written"); 668 bytesWritten.addAndGet(n); 669 if (continueWriting.get()) { 670 ByteBuffer[] srcs = genBuffers(8); 671 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 672 (Void)null, this); 673 } 674 } 675 public void failed(Throwable exc, Void att) { 676 } 677 }); 678 679 // give time for socket buffer to fill up. 680 Thread.sleep(5*1000); 681 682 // signal handler to stop further writing 683 continueWriting.set(false); 684 685 // read until done 686 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 687 long total = 0L; 688 do { 689 int n = sc.read(buf); 690 if (n <= 0) 691 throw new RuntimeException("No bytes read"); 692 buf.rewind(); 693 total += n; 694 } while (total < bytesWritten.get()); 695 696 ch.close(); 697 sc.close(); 698 } 699 } 700 701 static void testShutdown() throws Exception { 702 System.out.println("-- shutdown--"); 703 704 try (Server server = new Server(); 705 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 706 { 707 ch.connect(server.address()).get(); 708 try (SocketChannel peer = server.accept()) { 709 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 710 int n; 711 712 // check read 713 ch.shutdownInput(); 714 n = ch.read(buf).get(); 715 if (n != -1) 716 throw new RuntimeException("-1 expected"); 717 // check full with full buffer 718 buf.put(new byte[100]); 719 n = ch.read(buf).get(); 720 if (n != -1) 721 throw new RuntimeException("-1 expected"); 722 723 // check write 724 ch.shutdownOutput(); 725 try { 726 ch.write(buf).get(); 727 throw new RuntimeException("ClosedChannelException expected"); 728 } catch (ExecutionException x) { 729 if (!(x.getCause() instanceof ClosedChannelException)) 730 throw new RuntimeException("ClosedChannelException expected"); 731 } 732 } 733 } 734 } 735 736 static void testTimeout() throws Exception { 737 System.out.println("-- timeouts --"); 738 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 739 testTimeout(-1L, TimeUnit.SECONDS); 740 testTimeout(0L, TimeUnit.SECONDS); 741 testTimeout(2L, TimeUnit.SECONDS); 742 } 743 744 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 745 try (Server server = new Server()) { 746 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 747 ch.connect(server.address()).get(); 748 749 ByteBuffer dst = ByteBuffer.allocate(512); 750 751 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 752 753 // this read should timeout if value is > 0 754 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 755 public void completed(Integer result, Void att) { 756 readException.set(new RuntimeException("Should not complete")); 757 } 758 public void failed(Throwable exc, Void att) { 759 readException.set(exc); 760 } 761 }); 762 if (timeout > 0L) { 763 // wait for exception 764 while (readException.get() == null) { 765 Thread.sleep(100); 766 } 767 if (!(readException.get() instanceof InterruptedByTimeoutException)) 768 throw new RuntimeException("InterruptedByTimeoutException expected"); 769 770 // after a timeout then further reading should throw unspecified runtime exception 771 boolean exceptionThrown = false; 772 try { 773 ch.read(dst); 774 } catch (RuntimeException x) { 775 exceptionThrown = true; 776 } 777 if (!exceptionThrown) 778 throw new RuntimeException("RuntimeException expected after timeout."); 779 } else { 780 Thread.sleep(1000); 781 Throwable exc = readException.get(); 782 if (exc != null) 783 throw new RuntimeException(exc); 784 } 785 786 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 787 788 // write bytes to fill socket buffer 789 ch.write(genBuffer(), timeout, unit, ch, 790 new CompletionHandler<Integer,AsynchronousSocketChannel>() 791 { 792 public void completed(Integer result, AsynchronousSocketChannel ch) { 793 ch.write(genBuffer(), timeout, unit, ch, this); 794 } 795 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 796 writeException.set(exc); 797 } 798 }); 799 if (timeout > 0) { 800 // wait for exception 801 while (writeException.get() == null) { 802 Thread.sleep(100); 803 } 804 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 805 throw new RuntimeException("InterruptedByTimeoutException expected"); 806 807 // after a timeout then further writing should throw unspecified runtime exception 808 boolean exceptionThrown = false; 809 try { 810 ch.write(genBuffer()); 811 } catch (RuntimeException x) { 812 exceptionThrown = true; 813 } 814 if (!exceptionThrown) 815 throw new RuntimeException("RuntimeException expected after timeout."); 816 } else { 817 Thread.sleep(1000); 818 Throwable exc = writeException.get(); 819 if (exc != null) 820 throw new RuntimeException(exc); 821 } 822 823 // clean-up 824 server.accept().close(); 825 ch.close(); 826 } 827 } 828 829 // returns ByteBuffer with random bytes 830 static ByteBuffer genBuffer() { 831 int size = 1024 + rand.nextInt(16000); 832 byte[] buf = new byte[size]; 833 rand.nextBytes(buf); 834 boolean useDirect = rand.nextBoolean(); 835 if (useDirect) { 836 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 837 bb.put(buf); 838 bb.flip(); 839 return bb; 840 } else { 841 return ByteBuffer.wrap(buf); 842 } 843 } 844 845 // return ByteBuffer[] with random bytes 846 static ByteBuffer[] genBuffers(int max) { 847 int len = 1; 848 if (max > 1) 849 len += rand.nextInt(max); 850 ByteBuffer[] bufs = new ByteBuffer[len]; 851 for (int i=0; i<len; i++) 852 bufs[i] = genBuffer(); 853 return bufs; 854 } 855 856 // return random SocketAddress 857 static SocketAddress genSocketAddress() { 858 StringBuilder sb = new StringBuilder("10."); 859 sb.append(rand.nextInt(256)); 860 sb.append('.'); 861 sb.append(rand.nextInt(256)); 862 sb.append('.'); 863 sb.append(rand.nextInt(256)); 864 InetAddress rh; 865 try { 866 rh = InetAddress.getByName(sb.toString()); 867 } catch (UnknownHostException x) { 868 throw new InternalError("Should not happen"); 869 } 870 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 871 } 872 }