1 /*
   2  * Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6842687
  26  * @summary Unit test for AsynchronousChannelGroup
  27  */
  28 
  29 import java.nio.channels.*;
  30 import java.net.*;
  31 import java.util.*;
  32 import java.util.concurrent.*;
  33 import java.util.concurrent.atomic.*;
  34 import java.io.IOException;
  35 
  36 /**
  37  * Exercise replacement of threads in the thread pool when completion handlers
  38  * terminate due to errors or runtime exceptions.
  39  */
  40 
  41 public class Restart {
  42     static final Random rand = new Random();
  43 
  44     public static void main(String[] args) throws Exception {
  45         // thread group for thread pools
  46         final ThreadGroup tg = new ThreadGroup("test");
  47 
  48         // keep track of the number of threads that terminate
  49         final AtomicInteger exceptionCount = new AtomicInteger(0);
  50         final Thread.UncaughtExceptionHandler ueh =
  51             new Thread.UncaughtExceptionHandler() {
  52                 public void uncaughtException(Thread t, Throwable e) {
  53                     exceptionCount.incrementAndGet();
  54                 }
  55             };
  56         ThreadFactory factory = new ThreadFactory() {
  57             @Override
  58             public Thread newThread(Runnable r) {
  59                 Thread t = new Thread(tg, r);
  60                 t.setUncaughtExceptionHandler(ueh);
  61                 return t;
  62             }
  63         };
  64 
  65         // group with fixed thread pool
  66         int nThreads = 1 + rand.nextInt(4);
  67         AsynchronousChannelGroup group =
  68             AsynchronousChannelGroup.withFixedThreadPool(nThreads, factory);
  69         testRestart(group, 100);
  70         group.shutdown();
  71 
  72         // group with cached thread pool
  73         ExecutorService pool = Executors.newCachedThreadPool(factory);
  74         group = AsynchronousChannelGroup.withCachedThreadPool(pool, rand.nextInt(5));
  75         testRestart(group, 100);
  76         group.shutdown();
  77 
  78         // group with custom thread pool
  79         group = AsynchronousChannelGroup
  80                 .withThreadPool(Executors.newFixedThreadPool(1+rand.nextInt(5), factory));
  81         testRestart(group, 100);
  82         group.shutdown();
  83 
  84         // give time for threads to terminate
  85         Thread.sleep(3000);
  86         int actual = exceptionCount.get();
  87         if (actual != 300)
  88             throw new RuntimeException(actual + " exceptions, expected: " + 300);
  89     }
  90 
  91     static void testRestart(AsynchronousChannelGroup group, int count)
  92         throws Exception
  93     {
  94         AsynchronousServerSocketChannel listener =
  95             AsynchronousServerSocketChannel.open(group)
  96                 .bind(new InetSocketAddress(0));
  97 
  98         for (int i=0; i<count; i++) {
  99             final CountDownLatch latch = new CountDownLatch(1);
 100 
 101             listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
 102                 public void completed(AsynchronousSocketChannel ch, Void att) {
 103                     try {
 104                         ch.close();
 105                     } catch (IOException ignore) { }
 106 
 107                     latch.countDown();
 108 
 109                     // throw error or runtime exception
 110                     if (rand.nextBoolean()) {
 111                         throw new Error();
 112                     } else {
 113                         throw new RuntimeException();
 114                     }
 115                 }
 116                 public void failed(Throwable exc, Void att) {
 117                 }
 118             });
 119 
 120             // establish loopback connection which should cause completion
 121             // handler to be invoked.
 122             int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
 123             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
 124             InetAddress lh = InetAddress.getLocalHost();
 125             ch.connect(new InetSocketAddress(lh, port)).get();
 126             ch.close();
 127 
 128             // wait for handler to be invoked
 129             latch.await();
 130         }
 131 
 132         // clean-up
 133         listener.close();
 134     }
 135 }