1 /* 2 * Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* @test 25 * @summary Test selectors and socketchannels 26 * @library .. 27 */ 28 29 import java.io.*; 30 import java.net.*; 31 import java.nio.*; 32 import java.nio.channels.*; 33 import java.nio.channels.spi.SelectorProvider; 34 import java.util.*; 35 36 37 public class SelectorTest { 38 private static List clientList = new LinkedList(); 39 private static Random rnd = new Random(); 40 public static int NUM_CLIENTS = 30; 41 public static int TEST_PORT = 31452; 42 static PrintStream log = System.err; 43 private static int FINISH_TIME = 30000; 44 45 /* 46 * Usage note 47 * 48 * java SelectorTest [server] [client <host>] [<port>] 49 * 50 * No arguments runs both client and server in separate threads 51 * using the default port of 31452. 52 * 53 * client runs the client on this machine and connects to server 54 * at the given IP address. 55 * 56 * server runs the server on localhost. 57 */ 58 public static void main(String[] args) throws Exception { 59 if (args.length == 0) { 60 Server server = new Server(0); 61 server.start(); 62 try { 63 Thread.sleep(1000); 64 } catch (InterruptedException e) { } 65 InetSocketAddress isa 66 = new InetSocketAddress(InetAddress.getLocalHost(), server.port()); 67 Client client = new Client(isa); 68 client.start(); 69 if ((server.finish(FINISH_TIME) & client.finish(FINISH_TIME)) == 0) 70 throw new Exception("Failure"); 71 log.println(); 72 73 } else if (args[0].equals("server")) { 74 75 if (args.length > 1) 76 TEST_PORT = Integer.parseInt(args[1]); 77 Server server = new Server(TEST_PORT); 78 server.start(); 79 if (server.finish(FINISH_TIME) == 0) 80 throw new Exception("Failure"); 81 log.println(); 82 83 } else if (args[0].equals("client")) { 84 85 if (args.length < 2) { 86 log.println("No host specified: terminating."); 87 return; 88 } 89 String ip = args[1]; 90 if (args.length > 2) 91 TEST_PORT = Integer.parseInt(args[2]); 92 InetAddress ia = InetAddress.getByName(ip); 93 InetSocketAddress isa = new InetSocketAddress(ia, TEST_PORT); 94 Client client = new Client(isa); 95 client.start(); 96 if (client.finish(FINISH_TIME) == 0) 97 throw new Exception("Failure"); 98 log.println(); 99 100 } else { 101 System.out.println("Usage note:"); 102 System.out.println("java SelectorTest [server] [client <host>] [<port>]"); 103 System.out.println("No arguments runs both client and server in separate threads using the default port of 31452."); 104 System.out.println("client runs the client on this machine and connects to the server specified."); 105 System.out.println("server runs the server on localhost."); 106 } 107 } 108 109 static class Client extends TestThread { 110 InetSocketAddress isa; 111 Client(InetSocketAddress isa) { 112 super("Client", SelectorTest.log); 113 this.isa = isa; 114 } 115 116 public void go() throws Exception { 117 log.println("starting client..."); 118 for (int i=0; i<NUM_CLIENTS; i++) 119 clientList.add(new RemoteEntity(i, isa, log)); 120 121 Collections.shuffle(clientList); 122 123 log.println("created "+NUM_CLIENTS+" clients"); 124 do { 125 for (Iterator i = clientList.iterator(); i.hasNext(); ) { 126 RemoteEntity re = (RemoteEntity) i.next(); 127 if (re.cycle()) { 128 i.remove(); 129 } 130 } 131 Collections.shuffle(clientList); 132 } while (clientList.size() > 0); 133 } 134 } 135 136 static class Server extends TestThread { 137 private final ServerSocketChannel ssc; 138 private List socketList = new ArrayList(); 139 private ServerSocket ss; 140 private int connectionsAccepted = 0; 141 private Selector pollSelector; 142 private Selector acceptSelector; 143 private Set pkeys; 144 private Set pskeys; 145 146 Server(int port) throws IOException { 147 super("Server", SelectorTest.log); 148 this.ssc = ServerSocketChannel.open().bind(new InetSocketAddress(port)); 149 } 150 151 int port() { 152 return ssc.socket().getLocalPort(); 153 } 154 155 public void go() throws Exception { 156 log.println("starting server..."); 157 acceptSelector = SelectorProvider.provider().openSelector(); 158 pollSelector = SelectorProvider.provider().openSelector(); 159 pkeys = pollSelector.keys(); 160 pskeys = pollSelector.selectedKeys(); 161 Set readyKeys = acceptSelector.selectedKeys(); 162 RequestHandler rh = new RequestHandler(pollSelector, log); 163 Thread requestThread = new Thread(rh); 164 165 requestThread.start(); 166 167 ssc.configureBlocking(false); 168 SelectionKey acceptKey = ssc.register(acceptSelector, 169 SelectionKey.OP_ACCEPT); 170 while(connectionsAccepted < SelectorTest.NUM_CLIENTS) { 171 int keysAdded = acceptSelector.select(100); 172 if (keysAdded > 0) { 173 Iterator i = readyKeys.iterator(); 174 while(i.hasNext()) { 175 SelectionKey sk = (SelectionKey)i.next(); 176 i.remove(); 177 ServerSocketChannel nextReady = 178 (ServerSocketChannel)sk.channel(); 179 SocketChannel sc = nextReady.accept(); 180 connectionsAccepted++; 181 if (sc != null) { 182 sc.configureBlocking(false); 183 synchronized (pkeys) { 184 sc.register(pollSelector, SelectionKey.OP_READ); 185 } 186 } else { 187 throw new RuntimeException( 188 "Socket does not support Channels"); 189 } 190 } 191 } 192 } 193 acceptKey.cancel(); 194 requestThread.join(); 195 acceptSelector.close(); 196 pollSelector.close(); 197 } 198 } 199 } 200 201 class RemoteEntity { 202 private static Random rnd = new Random(); 203 int id; 204 ByteBuffer data; 205 int dataWrittenIndex; 206 int totalDataLength; 207 boolean initiated = false; 208 boolean connected = false; 209 boolean written = false; 210 boolean acked = false; 211 boolean closed = false; 212 private SocketChannel sc; 213 ByteBuffer ackBuffer; 214 PrintStream log; 215 InetSocketAddress server; 216 217 RemoteEntity(int id, InetSocketAddress server, PrintStream log) 218 throws Exception 219 { 220 int connectFailures = 0; 221 this.id = id; 222 this.log = log; 223 this.server = server; 224 225 sc = SocketChannel.open(); 226 sc.configureBlocking(false); 227 228 // Prepare the data buffer to write out from this entity 229 // Let's use both slow and fast buffers 230 if (rnd.nextBoolean()) 231 data = ByteBuffer.allocateDirect(100); 232 else 233 data = ByteBuffer.allocate(100); 234 String number = Integer.toString(id); 235 if (number.length() == 1) 236 number = "0"+number; 237 String source = "Testing from " + number; 238 data.put(source.getBytes("8859_1")); 239 data.flip(); 240 totalDataLength = source.length(); 241 242 // Allocate an ack buffer 243 ackBuffer = ByteBuffer.allocateDirect(10); 244 } 245 246 private void reset() throws Exception { 247 sc.close(); 248 sc = SocketChannel.open(); 249 sc.configureBlocking(false); 250 } 251 252 private void connect() throws Exception { 253 try { 254 connected = sc.connect(server); 255 initiated = true; 256 } catch (ConnectException e) { 257 initiated = false; 258 reset(); 259 } 260 } 261 262 private void finishConnect() throws Exception { 263 try { 264 connected = sc.finishConnect(); 265 } catch (IOException e) { 266 initiated = false; 267 reset(); 268 } 269 } 270 271 int id() { 272 return id; 273 } 274 275 boolean cycle() throws Exception { 276 if (!initiated) 277 connect(); 278 else if (!connected) 279 finishConnect(); 280 else if (!written) 281 writeCycle(); 282 else if (!acked) 283 ackCycle(); 284 else if (!closed) 285 close(); 286 return closed; 287 } 288 289 private void ackCycle() throws Exception { 290 //log.println("acking from "+id); 291 int bytesRead = sc.read(ackBuffer); 292 if (bytesRead > 0) { 293 acked = true; 294 } 295 } 296 297 private void close() throws Exception { 298 sc.close(); 299 closed = true; 300 } 301 302 private void writeCycle() throws Exception { 303 log.println("writing from "+id); 304 int numBytesToWrite = rnd.nextInt(10)+1; 305 int newWriteTarget = dataWrittenIndex + numBytesToWrite; 306 if (newWriteTarget > totalDataLength) 307 newWriteTarget = totalDataLength; 308 data.limit(newWriteTarget); 309 int bytesWritten = sc.write(data); 310 if (bytesWritten > 0) 311 dataWrittenIndex += bytesWritten; 312 if (dataWrittenIndex == totalDataLength) { 313 written = true; 314 sc.socket().shutdownOutput(); 315 } 316 } 317 318 } 319 320 321 class RequestHandler implements Runnable { 322 private static Random rnd = new Random(); 323 private Selector selector; 324 private int connectionsHandled = 0; 325 private HashMap dataBin = new HashMap(); 326 PrintStream log; 327 328 public RequestHandler(Selector selector, PrintStream log) { 329 this.selector = selector; 330 this.log = log; 331 } 332 333 public void run() { 334 log.println("starting request handler..."); 335 int connectionsAccepted = 0; 336 337 Set nKeys = selector.keys(); 338 Set readyKeys = selector.selectedKeys(); 339 340 try { 341 while(connectionsHandled < SelectorTest.NUM_CLIENTS) { 342 int numKeys = selector.select(100); 343 344 // Process channels with data 345 synchronized (nKeys) { 346 if (readyKeys.size() > 0) { 347 Iterator i = readyKeys.iterator(); 348 while(i.hasNext()) { 349 SelectionKey sk = (SelectionKey)i.next(); 350 i.remove(); 351 SocketChannel sc = (SocketChannel)sk.channel(); 352 if (sc.isOpen()) 353 read(sk, sc); 354 } 355 } 356 } 357 358 // Give other threads a chance to run 359 if (numKeys == 0) { 360 try { 361 Thread.sleep(1); 362 } catch (Exception x) {} 363 } 364 } 365 } catch (Exception e) { 366 log.println("Unexpected error 1: "+e); 367 e.printStackTrace(); 368 } 369 } 370 371 private void read(SelectionKey sk, SocketChannel sc) throws Exception { 372 ByteBuffer bin = (ByteBuffer)dataBin.get(sc); 373 if (bin == null) { 374 if (rnd.nextBoolean()) 375 bin = ByteBuffer.allocateDirect(100); 376 else 377 bin = ByteBuffer.allocate(100); 378 dataBin.put(sc, bin); 379 } 380 381 int bytesRead = 0; 382 do { 383 bytesRead = sc.read(bin); 384 } while(bytesRead > 0); 385 386 if (bytesRead == -1) { 387 sk.interestOps(0); 388 bin.flip(); 389 int size = bin.limit(); 390 byte[] data = new byte[size]; 391 for(int j=0; j<size; j++) 392 data[j] = bin.get(); 393 String message = new String(data, "8859_1"); 394 connectionsHandled++; 395 acknowledge(sc); 396 log.println("Received >>>"+message + "<<<"); 397 log.println("Handled: "+connectionsHandled); 398 } 399 } 400 401 private void acknowledge(SocketChannel sc) throws Exception { 402 ByteBuffer ackBuffer = ByteBuffer.allocateDirect(10); 403 String s = "ack"; 404 ackBuffer.put(s.getBytes("8859_1")); 405 ackBuffer.flip(); 406 int bytesWritten = 0; 407 while(bytesWritten == 0) { 408 bytesWritten += sc.write(ackBuffer); 409 } 410 sc.close(); 411 } 412 }