1 /*
   2  * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @summary Test selectors and socketchannels
  26  * @library ..
  27  */
  28 
  29 import java.io.*;
  30 import java.net.*;
  31 import java.nio.*;
  32 import java.nio.channels.*;
  33 import java.nio.channels.spi.SelectorProvider;
  34 import java.util.*;
  35 
  36 
  37 public class SelectorTest {
  38     private static List clientList = new LinkedList();
  39     private static Random rnd = new Random();
  40     public static int NUM_CLIENTS = 30;
  41     public static int TEST_PORT = 31452;
  42     static PrintStream log = System.err;
  43     private static int FINISH_TIME = 30000;
  44 
  45     /*
  46      * Usage note
  47      *
  48      * java SelectorTest [server] [client <host>] [<port>]
  49      *
  50      * No arguments runs both client and server in separate threads
  51      * using the default port of 31452.
  52      *
  53      * client runs the client on this machine and connects to server
  54      * at the given IP address.
  55      *
  56      * server runs the server on localhost.
  57      */
  58     public static void main(String[] args) throws Exception {
  59         if (args.length == 0) {
  60             Server server = new Server(0);
  61             server.start();
  62             try {
  63                 Thread.sleep(1000);
  64             } catch (InterruptedException e) { }
  65             InetSocketAddress isa
  66                 = new InetSocketAddress(InetAddress.getLocalHost(), server.port());
  67             Client client = new Client(isa);
  68             client.start();
  69             if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0)
  70                 throw new Exception("Failure");
  71             log.println();
  72 
  73         } else if (args[0].equals("server")) {
  74 
  75             if (args.length > 1)
  76                 TEST_PORT = Integer.parseInt(args[1]);
  77             Server server = new Server(TEST_PORT);
  78             server.start();
  79             if (server.finish(FINISH_TIME) == 0)
  80                 throw new Exception("Failure");
  81             log.println();
  82 
  83         } else if (args[0].equals("client")) {
  84 
  85             if (args.length < 2) {
  86                 log.println("No host specified: terminating.");
  87                 return;
  88             }
  89             String ip = args[1];
  90             if (args.length > 2)
  91                 TEST_PORT = Integer.parseInt(args[2]);
  92             InetAddress ia = InetAddress.getByName(ip);
  93             InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT);
  94             Client client = new Client(isa);
  95             client.start();
  96             if (client.finish(FINISH_TIME) == 0)
  97                 throw new Exception("Failure");
  98             log.println();
  99 
 100         } else {
 101             System.out.println("Usage note:");
 102             System.out.println("java SelectorTest [server] [client <host>] [<port>]");
 103             System.out.println("No arguments runs both client and server in separate threads using the default port of 31452.");
 104             System.out.println("client runs the client on this machine and connects to the server specified.");
 105             System.out.println("server runs the server on localhost.");
 106         }
 107     }
 108 
 109     static class Client extends TestThread {
 110         InetSocketAddress isa;
 111         Client(InetSocketAddress isa) {
 112             super("Client", SelectorTest.log);
 113             this.isa = isa;
 114         }
 115 
 116         public void go() throws Exception {
 117             log.println("starting client...");
 118             for (int i=0; i<NUM_CLIENTS; i++)
 119                 clientList.add(new RemoteEntity(i, isa, log));
 120 
 121             Collections.shuffle(clientList);
 122 
 123             log.println("created "+NUM_CLIENTS+" clients");
 124             do {
 125                 for (Iterator i = clientList.iterator(); i.hasNext(); ) {
 126                     RemoteEntity re = (RemoteEntity) i.next();
 127                     if (re.cycle()) {
 128                         i.remove();
 129                     }
 130                 }
 131                 Collections.shuffle(clientList);
 132             } while (clientList.size() > 0);
 133         }
 134     }
 135 
 136     static class Server extends TestThread {
 137         private final ServerSocketChannel ssc;
 138         private List socketList = new ArrayList();
 139         private ServerSocket ss;
 140         private int connectionsAccepted = 0;
 141         private Selector pollSelector;
 142         private Selector acceptSelector;
 143         private Set pkeys;
 144         private Set pskeys;
 145 
 146         Server(int port) throws IOException {
 147             super("Server", SelectorTest.log);
 148             this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port));
 149         }
 150 
 151         int port() {
 152             return ssc.socket().getLocalPort();
 153         }
 154 
 155         public void go() throws Exception {
 156             log.println("starting server...");
 157             acceptSelector = SelectorProvider.provider().openSelector();
 158             pollSelector = SelectorProvider.provider().openSelector();
 159             pkeys = pollSelector.keys();
 160             pskeys = pollSelector.selectedKeys();
 161             Set readyKeys = acceptSelector.selectedKeys();
 162             RequestHandler rh = new RequestHandler(pollSelector, log);
 163             Thread requestThread = new Thread(rh);
 164 
 165             requestThread.start();
 166 
 167             ssc.configureBlocking(false);
 168             SelectionKey acceptKey = ssc.register(acceptSelector,
 169                                                   SelectionKey.OP_ACCEPT);
 170             while(connectionsAccepted < SelectorTest.NUM_CLIENTS) {
 171                 int keysAdded = acceptSelector.select(100);
 172                 if (keysAdded > 0) {
 173                     Iterator i = readyKeys.iterator();
 174                     while(i.hasNext()) {
 175                         SelectionKey sk = (SelectionKey)i.next();
 176                         i.remove();
 177                         ServerSocketChannel nextReady =
 178                             (ServerSocketChannel)sk.channel();
 179                         SocketChannel sc = nextReady.accept();
 180                         connectionsAccepted++;
 181                         if (sc != null) {
 182                             sc.configureBlocking(false);
 183                             synchronized (pkeys) {
 184                                sc.register(pollSelector, SelectionKey.OP_READ);
 185                             }
 186                         } else {
 187                             throw new RuntimeException(
 188                                 "Socket does not support Channels");
 189                         }
 190                     }
 191                 }
 192             }
 193             acceptKey.cancel();
 194             requestThread.join();
 195             acceptSelector.close();
 196             pollSelector.close();
 197         }
 198     }
 199 }
 200 
 201 class RemoteEntity {
 202     private static Random rnd = new Random();
 203     int id;
 204     ByteBuffer data;
 205     int dataWrittenIndex;
 206     int totalDataLength;
 207     boolean initiated = false;
 208     boolean connected = false;
 209     boolean written = false;
 210     boolean acked = false;
 211     boolean closed = false;
 212     private SocketChannel sc;
 213     ByteBuffer ackBuffer;
 214     PrintStream log;
 215     InetSocketAddress server;
 216 
 217     RemoteEntity(int id, InetSocketAddress server, PrintStream log)
 218         throws Exception
 219     {
 220         int connectFailures = 0;
 221         this.id = id;
 222         this.log = log;
 223         this.server = server;
 224 
 225         sc = SocketChannel.open();
 226         sc.configureBlocking(false);
 227 
 228         // Prepare the data buffer to write out from this entity
 229         // Let's use both slow and fast buffers
 230         if (rnd.nextBoolean())
 231             data = ByteBuffer.allocateDirect(100);
 232         else
 233             data = ByteBuffer.allocate(100);
 234         String number = Integer.toString(id);
 235         if (number.length() == 1)
 236             number = "0"+number;
 237         String source = "Testing from " + number;
 238         data.put(source.getBytes("8859_1"));
 239         data.flip();
 240         totalDataLength = source.length();
 241 
 242         // Allocate an ack buffer
 243         ackBuffer = ByteBuffer.allocateDirect(10);
 244     }
 245 
 246     private void reset() throws Exception {
 247         sc.close();
 248         sc = SocketChannel.open();
 249         sc.configureBlocking(false);
 250     }
 251 
 252     private void connect() throws Exception {
 253         try {
 254             connected = sc.connect(server);
 255             initiated = true;
 256         }  catch (ConnectException e) {
 257             initiated = false;
 258             reset();
 259         }
 260     }
 261 
 262     private void finishConnect() throws Exception {
 263         try {
 264             connected = sc.finishConnect();
 265         }  catch (IOException e) {
 266             initiated = false;
 267             reset();
 268         }
 269     }
 270 
 271     int id() {
 272         return id;
 273     }
 274 
 275     boolean cycle() throws Exception {
 276         if (!initiated)
 277             connect();
 278         else if (!connected)
 279             finishConnect();
 280         else if (!written)
 281             writeCycle();
 282         else if (!acked)
 283             ackCycle();
 284         else if (!closed)
 285             close();
 286         return closed;
 287     }
 288 
 289     private void ackCycle() throws Exception {
 290         //log.println("acking from "+id);
 291         int bytesRead = sc.read(ackBuffer);
 292         if (bytesRead > 0) {
 293             acked = true;
 294         }
 295     }
 296 
 297     private void close() throws Exception {
 298         sc.close();
 299         closed = true;
 300     }
 301 
 302     private void writeCycle() throws Exception {
 303         log.println("writing from "+id);
 304         int numBytesToWrite = rnd.nextInt(10)+1;
 305         int newWriteTarget = dataWrittenIndex + numBytesToWrite;
 306         if (newWriteTarget > totalDataLength)
 307             newWriteTarget = totalDataLength;
 308         data.limit(newWriteTarget);
 309         int bytesWritten = sc.write(data);
 310         if (bytesWritten > 0)
 311             dataWrittenIndex += bytesWritten;
 312         if (dataWrittenIndex == totalDataLength) {
 313             written = true;
 314             sc.socket().shutdownOutput();
 315         }
 316     }
 317 
 318 }
 319 
 320 
 321 class RequestHandler implements Runnable {
 322     private static Random rnd = new Random();
 323     private Selector selector;
 324     private int connectionsHandled = 0;
 325     private HashMap dataBin = new HashMap();
 326     PrintStream log;
 327 
 328     public RequestHandler(Selector selector, PrintStream log) {
 329         this.selector = selector;
 330         this.log = log;
 331     }
 332 
 333     public void run() {
 334         log.println("starting request handler...");
 335         int connectionsAccepted = 0;
 336 
 337         Set nKeys = selector.keys();
 338         Set readyKeys = selector.selectedKeys();
 339 
 340         try {
 341             while(connectionsHandled < SelectorTest.NUM_CLIENTS) {
 342                 int numKeys = selector.select(100);
 343 
 344                 // Process channels with data
 345                 synchronized (nKeys) {
 346                     if (readyKeys.size() > 0) {
 347                         Iterator i = readyKeys.iterator();
 348                         while(i.hasNext()) {
 349                             SelectionKey sk = (SelectionKey)i.next();
 350                             i.remove();
 351                             SocketChannel sc = (SocketChannel)sk.channel();
 352                             if (sc.isOpen())
 353                                 read(sk, sc);
 354                         }
 355                     }
 356                 }
 357 
 358                 // Give other threads a chance to run
 359                 if (numKeys == 0) {
 360                     try {
 361                         Thread.sleep(1);
 362                     } catch (Exception x) {}
 363                 }
 364             }
 365         } catch (Exception e) {
 366             log.println("Unexpected error 1: "+e);
 367             e.printStackTrace();
 368         }
 369     }
 370 
 371     private void read(SelectionKey sk, SocketChannel sc) throws Exception {
 372         ByteBuffer bin = (ByteBuffer)dataBin.get(sc);
 373         if (bin == null) {
 374             if (rnd.nextBoolean())
 375                 bin = ByteBuffer.allocateDirect(100);
 376             else
 377                 bin = ByteBuffer.allocate(100);
 378             dataBin.put(sc, bin);
 379         }
 380 
 381         int bytesRead = 0;
 382         do {
 383             bytesRead = sc.read(bin);
 384         } while(bytesRead > 0);
 385 
 386         if (bytesRead == -1) {
 387             sk.interestOps(0);
 388             bin.flip();
 389             int size = bin.limit();
 390             byte[] data = new byte[size];
 391             for(int j=0; j<size; j++)
 392                 data[j] = bin.get();
 393             String message = new String(data, "8859_1");
 394             connectionsHandled++;
 395             acknowledge(sc);
 396             log.println("Received >>>"+message + "<<<");
 397             log.println("Handled: "+connectionsHandled);
 398         }
 399     }
 400 
 401     private void acknowledge(SocketChannel sc) throws Exception {
 402             ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10);
 403             String s = "ack";
 404             ackBuffer.put(s.getBytes("8859_1"));
 405             ackBuffer.flip();
 406             int bytesWritten = 0;
 407             while(bytesWritten == 0) {
 408                 bytesWritten += sc.write(ackBuffer);
 409             }
 410             sc.close();
 411     }
 412 }