1 /*
   2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 6834246 6842687
  26  * @summary Stress test connections through the loopback interface
  27  * @run main StressLoopback
  28  * @run main/othervm -Djdk.net.useFastTcpLoopback StressLoopback
  29  */
  30 
  31 import java.nio.ByteBuffer;
  32 import java.net.*;
  33 import java.nio.channels.*;
  34 import java.util.Random;
  35 import java.io.IOException;
  36 
  37 public class StressLoopback {
  38     static final Random rand = new Random();
  39 
  40     public static void main(String[] args) throws Exception {
  41         // setup listener
  42         AsynchronousServerSocketChannel listener =
  43             AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
  44         int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
  45         InetAddress lh = InetAddress.getLocalHost();
  46         SocketAddress remote = new InetSocketAddress(lh, port);
  47 
  48         // create sources and sinks
  49         int count = 2 + rand.nextInt(9);
  50         Source[] source = new Source[count];
  51         Sink[] sink = new Sink[count];
  52         for (int i=0; i<count; i++) {
  53             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
  54             ch.connect(remote).get();
  55             source[i] = new Source(ch);
  56             sink[i] = new Sink(listener.accept().get());
  57         }
  58 
  59         // start the sinks and sources
  60         for (int i=0; i<count; i++) {
  61             sink[i].start();
  62             source[i].start();
  63         }
  64 
  65         // let the test run for a while
  66         Thread.sleep(20*1000);
  67 
  68         // wait until everyone is done
  69         boolean failed = false;
  70         long total = 0L;
  71         for (int i=0; i<count; i++) {
  72             long nwrote = source[i].finish();
  73             long nread = sink[i].finish();
  74             if (nread != nwrote)
  75                 failed = true;
  76             System.out.format("%d -> %d (%s)\n",
  77                 nwrote, nread, (failed) ? "FAIL" : "PASS");
  78             total += nwrote;
  79         }
  80         if (failed)
  81             throw new RuntimeException("Test failed - see log for details");
  82         System.out.format("Total sent %d MB\n", total / (1024L * 1024L));
  83     }
  84 
  85     /**
  86      * Writes bytes to a channel until "done". When done the channel is closed.
  87      */
  88     static class Source {
  89         private final AsynchronousByteChannel channel;
  90         private final ByteBuffer sentBuffer;
  91         private volatile long bytesSent;
  92         private volatile boolean finished;
  93 
  94         Source(AsynchronousByteChannel channel) {
  95             this.channel = channel;
  96             int size = 1024 + rand.nextInt(10000);
  97             this.sentBuffer = (rand.nextBoolean()) ?
  98                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
  99         }
 100 
 101         void start() {
 102             sentBuffer.position(0);
 103             sentBuffer.limit(sentBuffer.capacity());
 104             channel.write(sentBuffer, (Void)null, new CompletionHandler<Integer,Void> () {
 105                 public void completed(Integer nwrote, Void att) {
 106                     bytesSent += nwrote;
 107                     if (finished) {
 108                         closeUnchecked(channel);
 109                     } else {
 110                         sentBuffer.position(0);
 111                         sentBuffer.limit(sentBuffer.capacity());
 112                         channel.write(sentBuffer, (Void)null, this);
 113                     }
 114                 }
 115                 public void failed(Throwable exc, Void att) {
 116                     exc.printStackTrace();
 117                     closeUnchecked(channel);
 118                 }
 119             });
 120         }
 121 
 122         long finish() {
 123             finished = true;
 124             waitUntilClosed(channel);
 125             return bytesSent;
 126         }
 127     }
 128 
 129     /**
 130      * Read bytes from a channel until EOF is received.
 131      */
 132     static class Sink {
 133         private final AsynchronousByteChannel channel;
 134         private final ByteBuffer readBuffer;
 135         private volatile long bytesRead;
 136 
 137         Sink(AsynchronousByteChannel channel) {
 138             this.channel = channel;
 139             int size = 1024 + rand.nextInt(10000);
 140             this.readBuffer = (rand.nextBoolean()) ?
 141                 ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
 142         }
 143 
 144         void start() {
 145             channel.read(readBuffer, (Void)null, new CompletionHandler<Integer,Void> () {
 146                 public void completed(Integer nread, Void att) {
 147                     if (nread < 0) {
 148                         closeUnchecked(channel);
 149                     } else {
 150                         bytesRead += nread;
 151                         readBuffer.clear();
 152                         channel.read(readBuffer, (Void)null, this);
 153                     }
 154                 }
 155                 public void failed(Throwable exc, Void att) {
 156                     exc.printStackTrace();
 157                     closeUnchecked(channel);
 158                 }
 159             });
 160         }
 161 
 162         long finish() {
 163             waitUntilClosed(channel);
 164             return bytesRead;
 165         }
 166     }
 167 
 168     static void waitUntilClosed(Channel c) {
 169         while (c.isOpen()) {
 170             try {
 171                 Thread.sleep(100);
 172             } catch (InterruptedException ignore) { }
 173         }
 174     }
 175 
 176     static void closeUnchecked(Channel c) {
 177         try {
 178             c.close();
 179         } catch (IOException ignore) { }
 180     }
 181 }