1 /*
   2  * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @summary Test selectors and socketchannels
  26  * @library ..
  27  * @key randomness
  28  */
  29 
  30 import java.io.*;
  31 import java.net.*;
  32 import java.nio.*;
  33 import java.nio.channels.*;
  34 import java.nio.channels.spi.SelectorProvider;
  35 import java.util.*;
  36 
  37 
  38 public class SelectorTest {
  39     private static List clientList = new LinkedList();
  40     private static Random rnd = new Random();
  41     public static int NUM_CLIENTS = 30;
  42     public static int TEST_PORT = 31452;
  43     static PrintStream log = System.err;
  44     private static int FINISH_TIME = 30000;
  45 
  46     /*
  47      * Usage note
  48      *
  49      * java SelectorTest [server] [client <host>] [<port>]
  50      *
  51      * No arguments runs both client and server in separate threads
  52      * using the default port of 31452.
  53      *
  54      * client runs the client on this machine and connects to server
  55      * at the given IP address.
  56      *
  57      * server runs the server on localhost.
  58      */
  59     public static void main(String[] args) throws Exception {
  60         if (args.length == 0) {
  61             Server server = new Server(0);
  62             server.start();
  63             try {
  64                 Thread.sleep(1000);
  65             } catch (InterruptedException e) { }
  66             InetSocketAddress isa
  67                 = new InetSocketAddress(InetAddress.getLocalHost(), server.port());
  68             Client client = new Client(isa);
  69             client.start();
  70             if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)
  71                 throw new Exception("Failure");
  72             log.println();
  73 
  74         } else if (args[0].equals("server")) {
  75 
  76             if (args.length > 1)
  77                 TEST_PORT = Integer.parseInt(args[1]);
  78             Server server = new Server(TEST_PORT);
  79             server.start();
  80             if (server.finish(FINISH_TIME) == 0)
  81                 throw new Exception("Failure");
  82             log.println();
  83 
  84         } else if (args[0].equals("client")) {
  85 
  86             if (args.length < 2) {
  87                 log.println("No host specified: terminating.");
  88                 return;
  89             }
  90             String ip = args[1];
  91             if (args.length > 2)
  92                 TEST_PORT = Integer.parseInt(args[2]);
  93             InetAddress ia = InetAddress.getByName(ip);
  94             InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);
  95             Client client = new Client(isa);
  96             client.start();
  97             if (client.finish(FINISH_TIME) == 0)
  98                 throw new Exception("Failure");
  99             log.println();
 100 
 101         } else {
 102             System.out.println("Usage note:");
 103             System.out.println("java SelectorTest [server] [client <host>] [<port>]");
 104             System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");
 105             System.out.println("client runs the client on this machine and connects to the server specified.");
 106             System.out.println("server runs the server on localhost.");
 107         }
 108     }
 109 
 110     static class Client extends TestThread {
 111         InetSocketAddress isa;
 112         Client(InetSocketAddress isa) {
 113             super("Client", SelectorTest.log);
 114             this.isa = isa;
 115         }
 116 
 117         public void go() throws Exception {
 118             log.println("starting client...");
 119             for (int i=0; i<NUM_CLIENTS; i++)
 120                 clientList.add(new RemoteEntity(i, isa, log));
 121 
 122             Collections.shuffle(clientList);
 123 
 124             log.println("created "+NUM_CLIENTS+" clients");
 125             do {
 126                 for (Iterator i = clientList.iterator(); i.hasNext(); ) {
 127                     RemoteEntity re = (RemoteEntity) i.next();
 128                     if (re.cycle()) {
 129                         i.remove();
 130                     }
 131                 }
 132                 Collections.shuffle(clientList);
 133             } while (clientList.size() > 0);
 134         }
 135     }
 136 
 137     static class Server extends TestThread {
 138         private final ServerSocketChannel ssc;
 139         private List socketList = new ArrayList();
 140         private ServerSocket ss;
 141         private int connectionsAccepted = 0;
 142         private Selector pollSelector;
 143         private Selector acceptSelector;
 144         private Set pkeys;
 145         private Set pskeys;
 146 
 147         Server(int port) throws IOException {
 148             super("Server", SelectorTest.log);
 149             this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port));
 150         }
 151 
 152         int port() {
 153             return ssc.socket().getLocalPort();
 154         }
 155 
 156         public void go() throws Exception {
 157             log.println("starting server...");
 158             acceptSelector = SelectorProvider.provider().openSelector();
 159             pollSelector = SelectorProvider.provider().openSelector();
 160             pkeys = pollSelector.keys();
 161             pskeys = pollSelector.selectedKeys();
 162             Set readyKeys = acceptSelector.selectedKeys();
 163             RequestHandler rh = new RequestHandler(pollSelector, log);
 164             Thread requestThread = new Thread(rh);
 165 
 166             requestThread.start();
 167 
 168             ssc.configureBlocking(false);
 169             SelectionKey acceptKey = ssc.register(acceptSelector,
 170                                                   SelectionKey.OP_ACCEPT);
 171             while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {
 172                 int keysAdded = acceptSelector.select(100);
 173                 if (keysAdded > 0) {
 174                     Iterator i = readyKeys.iterator();
 175                     while(i.hasNext()) {
 176                         SelectionKey sk = (SelectionKey)i.next();
 177                         i.remove();
 178                         ServerSocketChannel nextReady =
 179                             (ServerSocketChannel)sk.channel();
 180                         SocketChannel sc = nextReady.accept();
 181                         connectionsAccepted++;
 182                         if (sc != null) {
 183                             sc.configureBlocking(false);
 184                             synchronized (pkeys) {
 185                                sc.register(pollSelector, SelectionKey.OP_READ);
 186                             }
 187                         } else {
 188                             throw new RuntimeException(
 189                                 "Socket does not support Channels");
 190                         }
 191                     }
 192                 }
 193             }
 194             acceptKey.cancel();
 195             requestThread.join();
 196             acceptSelector.close();
 197             pollSelector.close();
 198         }
 199     }
 200 }
 201 
 202 class RemoteEntity {
 203     private static Random rnd = new Random();
 204     int id;
 205     ByteBuffer data;
 206     int dataWrittenIndex;
 207     int totalDataLength;
 208     boolean initiated = false;
 209     boolean connected = false;
 210     boolean written = false;
 211     boolean acked = false;
 212     boolean closed = false;
 213     private SocketChannel sc;
 214     ByteBuffer ackBuffer;
 215     PrintStream log;
 216     InetSocketAddress server;
 217 
 218     RemoteEntity(int id, InetSocketAddress server, PrintStream log)
 219         throws Exception
 220     {
 221         int connectFailures = 0;
 222         this.id = id;
 223         this.log = log;
 224         this.server = server;
 225 
 226         sc = SocketChannel.open();
 227         sc.configureBlocking(false);
 228 
 229         // Prepare the data buffer to write out from this entity
 230         // Let's use both slow and fast buffers
 231         if (rnd.nextBoolean())
 232             data = ByteBuffer.allocateDirect(100);
 233         else
 234             data = ByteBuffer.allocate(100);
 235         String number = Integer.toString(id);
 236         if (number.length() == 1)
 237             number = "0"+number;
 238         String source = "Testing from " + number;
 239         data.put(source.getBytes("8859_1"));
 240         data.flip();
 241         totalDataLength = source.length();
 242 
 243         // Allocate an ack buffer
 244         ackBuffer = ByteBuffer.allocateDirect(10);
 245     }
 246 
 247     private void reset() throws Exception {
 248         sc.close();
 249         sc = SocketChannel.open();
 250         sc.configureBlocking(false);
 251     }
 252 
 253     private void connect() throws Exception {
 254         try {
 255             connected = sc.connect(server);
 256             initiated = true;
 257         }  catch (ConnectException e) {
 258             initiated = false;
 259             reset();
 260         }
 261     }
 262 
 263     private void finishConnect() throws Exception {
 264         try {
 265             connected = sc.finishConnect();
 266         }  catch (IOException e) {
 267             initiated = false;
 268             reset();
 269         }
 270     }
 271 
 272     int id() {
 273         return id;
 274     }
 275 
 276     boolean cycle() throws Exception {
 277         if (!initiated)
 278             connect();
 279         else if (!connected)
 280             finishConnect();
 281         else if (!written)
 282             writeCycle();
 283         else if (!acked)
 284             ackCycle();
 285         else if (!closed)
 286             close();
 287         return closed;
 288     }
 289 
 290     private void ackCycle() throws Exception {
 291         //log.println("acking from "+id);
 292         int bytesRead = sc.read(ackBuffer);
 293         if (bytesRead > 0) {
 294             acked = true;
 295         }
 296     }
 297 
 298     private void close() throws Exception {
 299         sc.close();
 300         closed = true;
 301     }
 302 
 303     private void writeCycle() throws Exception {
 304         log.println("writing from "+id);
 305         int numBytesToWrite = rnd.nextInt(10)+1;
 306         int newWriteTarget = dataWrittenIndex + numBytesToWrite;
 307         if (newWriteTarget > totalDataLength)
 308             newWriteTarget = totalDataLength;
 309         data.limit(newWriteTarget);
 310         int bytesWritten = sc.write(data);
 311         if (bytesWritten > 0)
 312             dataWrittenIndex += bytesWritten;
 313         if (dataWrittenIndex == totalDataLength) {
 314             written = true;
 315             sc.socket().shutdownOutput();
 316         }
 317     }
 318 
 319 }
 320 
 321 
 322 class RequestHandler implements Runnable {
 323     private static Random rnd = new Random();
 324     private Selector selector;
 325     private int connectionsHandled = 0;
 326     private HashMap dataBin = new HashMap();
 327     PrintStream log;
 328 
 329     public RequestHandler(Selector selector, PrintStream log) {
 330         this.selector = selector;
 331         this.log = log;
 332     }
 333 
 334     public void run() {
 335         log.println("starting request handler...");
 336         int connectionsAccepted = 0;
 337 
 338         Set nKeys = selector.keys();
 339         Set readyKeys = selector.selectedKeys();
 340 
 341         try {
 342             while(connectionsHandled < SelectorTest.NUM_CLIENTS) {
 343                 int numKeys = selector.select(100);
 344 
 345                 // Process channels with data
 346                 synchronized (nKeys) {
 347                     if (readyKeys.size() > 0) {
 348                         Iterator i = readyKeys.iterator();
 349                         while(i.hasNext()) {
 350                             SelectionKey sk = (SelectionKey)i.next();
 351                             i.remove();
 352                             SocketChannel sc = (SocketChannel)sk.channel();
 353                             if (sc.isOpen())
 354                                 read(sk, sc);
 355                         }
 356                     }
 357                 }
 358 
 359                 // Give other threads a chance to run
 360                 if (numKeys == 0) {
 361                     try {
 362                         Thread.sleep(1);
 363                     } catch (Exception x) {}
 364                 }
 365             }
 366         } catch (Exception e) {
 367             log.println("Unexpected error 1: "+e);
 368             e.printStackTrace();
 369         }
 370     }
 371 
 372     private void read(SelectionKey sk, SocketChannel sc) throws Exception {
 373         ByteBuffer bin = (ByteBuffer)dataBin.get(sc);
 374         if (bin == null) {
 375             if (rnd.nextBoolean())
 376                 bin = ByteBuffer.allocateDirect(100);
 377             else
 378                 bin = ByteBuffer.allocate(100);
 379             dataBin.put(sc, bin);
 380         }
 381 
 382         int bytesRead = 0;
 383         do {
 384             bytesRead = sc.read(bin);
 385         } while(bytesRead > 0);
 386 
 387         if (bytesRead == -1) {
 388             sk.interestOps(0);
 389             bin.flip();
 390             int size = bin.limit();
 391             byte[] data = new byte[size];
 392             for(int j=0; j<size; j++)
 393                 data[j] = bin.get();
 394             String message = new String(data, "8859_1");
 395             connectionsHandled++;
 396             acknowledge(sc);
 397             log.println("Received >>>"+message + "<<<");
 398             log.println("Handled: "+connectionsHandled);
 399         }
 400     }
 401 
 402     private void acknowledge(SocketChannel sc) throws Exception {
 403             ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);
 404             String s = "ack";
 405             ackBuffer.put(s.getBytes("8859_1"));
 406             ackBuffer.flip();
 407             int bytesWritten = 0;
 408             while(bytesWritten == 0) {
 409                 bytesWritten += sc.write(ackBuffer);
 410             }
 411             sc.close();
 412     }
 413 }