1 /*
   2  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 /* @test
  25  * @bug 4607272 6822643 6830721 6842687
  26  * @summary Unit test for AsynchronousFileChannel
  27  */
  28 
  29 import java.nio.file.*;
  30 import java.nio.channels.*;
  31 import java.nio.ByteBuffer;
  32 import java.io.File;
  33 import java.io.IOException;
  34 import java.util.*;
  35 import java.util.concurrent.*;
  36 import java.util.concurrent.atomic.AtomicReference;
  37 import static java.nio.file.StandardOpenOption.*;
  38 
  39 public class Basic {
  40 
  41     private static final Random rand = new Random();
  42 
  43     public static void main(String[] args) throws IOException {
  44         // create temporary file
  45         File blah = File.createTempFile("blah", null);
  46         blah.deleteOnExit();
  47 
  48         AsynchronousFileChannel ch = AsynchronousFileChannel
  49             .open(blah.toPath(), READ, WRITE);
  50         try {
  51             // run tests
  52             testUsingCompletionHandlers(ch);
  53             testUsingWaitOnResult(ch);
  54             testInterruptHandlerThread(ch);
  55         } finally {
  56             ch.close();
  57         }
  58 
  59         // run test that expects channel to be closed
  60         testClosedChannel(ch);
  61 
  62         // these tests open the file themselves
  63         testLocking(blah.toPath());
  64         testCustomThreadPool(blah.toPath());
  65         testAsynchronousClose(blah.toPath());
  66         testCancel(blah.toPath());
  67         testTruncate(blah.toPath());
  68 
  69         // eagerly clean-up
  70         blah.delete();
  71     }
  72 
  73     /*
  74      * Generate buffer with random contents
  75      * Writes buffer to file using a CompletionHandler to consume the result
  76      *    of each write operation
  77      * Reads file to EOF to a new buffer using a CompletionHandler to consume
  78      *    the result of each read operation
  79      * Compares buffer contents
  80      */
  81     static void testUsingCompletionHandlers(AsynchronousFileChannel ch)
  82         throws IOException
  83     {
  84         System.out.println("testUsingCompletionHandlers");
  85 
  86         ch.truncate(0L);
  87 
  88         // generate buffer with random elements and write it to file
  89         ByteBuffer src = genBuffer();
  90         writeFully(ch, src, 0L);
  91 
  92         // read to EOF or buffer is full
  93         ByteBuffer dst = (rand.nextBoolean()) ?
  94             ByteBuffer.allocateDirect(src.capacity()) :
  95             ByteBuffer.allocate(src.capacity());
  96         readAll(ch, dst, 0L);
  97 
  98         // check buffers are the same
  99         src.flip();
 100         dst.flip();
 101         if (!src.equals(dst)) {
 102             throw new RuntimeException("Contents differ");
 103         }
 104     }
 105 
 106     /*
 107      * Generate buffer with random contents
 108      * Writes buffer to file, invoking the Future's get method to wait for
 109      *    each write operation to complete
 110      * Reads file to EOF to a new buffer, invoking the Future's get method to
 111      *    wait for each write operation to complete
 112      * Compares buffer contents
 113      */
 114     static void testUsingWaitOnResult(AsynchronousFileChannel ch)
 115         throws IOException
 116     {
 117         System.out.println("testUsingWaitOnResult");
 118 
 119         ch.truncate(0L);
 120 
 121         // generate buffer
 122         ByteBuffer src = genBuffer();
 123 
 124         // write buffer completely to file
 125         long position = 0L;
 126         while (src.hasRemaining()) {
 127             Future<Integer> result = ch.write(src, position);
 128             try {
 129                 int n = result.get();
 130                 // update position
 131                 position += n;
 132             } catch (ExecutionException x) {
 133                 throw new RuntimeException(x.getCause());
 134             } catch (InterruptedException x) {
 135                 throw new RuntimeException(x);
 136             }
 137         }
 138 
 139         // read file into new buffer
 140         ByteBuffer dst = (rand.nextBoolean()) ?
 141             ByteBuffer.allocateDirect(src.capacity()) :
 142             ByteBuffer.allocate(src.capacity());
 143         position = 0L;
 144         int n;
 145         do {
 146             Future<Integer> result = ch.read(dst, position);
 147             try {
 148                 n = result.get();
 149 
 150                 // update position
 151                 if (n > 0) position += n;
 152             } catch (ExecutionException x) {
 153                 throw new RuntimeException(x.getCause());
 154             } catch (InterruptedException x) {
 155                 throw new RuntimeException(x);
 156             }
 157         } while (n > 0);
 158 
 159         // check buffers are the same
 160         src.flip();
 161         dst.flip();
 162         if (!src.equals(dst)) {
 163             throw new RuntimeException("Contents differ");
 164         }
 165     }
 166 
 167     // exercise lock methods
 168     static void testLocking(Path file) throws IOException {
 169         System.out.println("testLocking");
 170 
 171         AsynchronousFileChannel ch = AsynchronousFileChannel
 172             .open(file, READ, WRITE);
 173         FileLock fl;
 174         try {
 175             // test 1 - acquire lock and check that tryLock throws
 176             // OverlappingFileLockException
 177             try {
 178                 fl = ch.lock().get();
 179             } catch (ExecutionException x) {
 180                 throw new RuntimeException(x);
 181             } catch (InterruptedException x) {
 182                 throw new RuntimeException("Should not be interrupted");
 183             }
 184             if (!fl.acquiredBy().equals(ch))
 185                 throw new RuntimeException("FileLock#acquiredBy returned incorrect channel");
 186             try {
 187                 ch.tryLock();
 188                 throw new RuntimeException("OverlappingFileLockException expected");
 189             } catch (OverlappingFileLockException x) {
 190             }
 191             fl.release();
 192 
 193             // test 2 - acquire try and check that lock throws OverlappingFileLockException
 194             fl = ch.tryLock();
 195             if (fl == null)
 196                 throw new RuntimeException("Unable to acquire lock");
 197             try {
 198                 ch.lock((Void)null, new CompletionHandler<FileLock,Void> () {
 199                     public void completed(FileLock result, Void att) {
 200                     }
 201                     public void failed(Throwable exc, Void att) {
 202                     }
 203                 });
 204                 throw new RuntimeException("OverlappingFileLockException expected");
 205             } catch (OverlappingFileLockException x) {
 206             }
 207         } finally {
 208             ch.close();
 209         }
 210 
 211         // test 3 - channel is closed so FileLock should no longer be valid
 212         if (fl.isValid())
 213             throw new RuntimeException("FileLock expected to be invalid");
 214     }
 215 
 216     // interrupt should not close channel
 217     static void testInterruptHandlerThread(final AsynchronousFileChannel ch) {
 218         System.out.println("testInterruptHandlerThread");
 219 
 220         ByteBuffer buf = ByteBuffer.allocateDirect(100);
 221         final CountDownLatch latch = new CountDownLatch(1);
 222 
 223         ch.read(buf, 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 224             public void completed(Integer result, Void att) {
 225                 try {
 226                     Thread.currentThread().interrupt();
 227                     long size = ch.size();
 228                     latch.countDown();
 229                 } catch (IOException x) {
 230                     x.printStackTrace();
 231                 }
 232             }
 233             public void failed(Throwable exc, Void att) {
 234             }
 235         });
 236 
 237         // wait for handler to complete
 238         await(latch);
 239     }
 240 
 241     // invoke method on closed channel
 242     static void testClosedChannel(AsynchronousFileChannel ch) {
 243         System.out.println("testClosedChannel");
 244 
 245         if (ch.isOpen())
 246             throw new RuntimeException("Channel should be closed");
 247 
 248         ByteBuffer buf = ByteBuffer.allocateDirect(100);
 249 
 250         // check read fails with ClosedChannelException
 251         try {
 252             ch.read(buf, 0L).get();
 253             throw new RuntimeException("ExecutionException expected");
 254         } catch (ExecutionException x) {
 255             if (!(x.getCause() instanceof ClosedChannelException))
 256                 throw new RuntimeException("Cause of ClosedChannelException expected");
 257         } catch (InterruptedException x) {
 258         }
 259 
 260         // check write fails with ClosedChannelException
 261         try {
 262             ch.write(buf, 0L).get();
 263             throw new RuntimeException("ExecutionException expected");
 264         } catch (ExecutionException x) {
 265             if (!(x.getCause() instanceof ClosedChannelException))
 266                 throw new RuntimeException("Cause of ClosedChannelException expected");
 267         } catch (InterruptedException x) {
 268         }
 269 
 270         // check lock fails with ClosedChannelException
 271         try {
 272             ch.lock().get();
 273             throw new RuntimeException("ExecutionException expected");
 274         } catch (ExecutionException x) {
 275             if (!(x.getCause() instanceof ClosedChannelException))
 276                 throw new RuntimeException("Cause of ClosedChannelException expected");
 277         } catch (InterruptedException x) {
 278         }
 279     }
 280 
 281 
 282     // exercise custom thread pool
 283     static void testCustomThreadPool(Path file) throws IOException {
 284         System.out.println("testCustomThreadPool");
 285 
 286         // records threads that are created
 287         final List<Thread> threads = new ArrayList<Thread>();
 288 
 289         ThreadFactory threadFactory = new ThreadFactory() {
 290              @Override
 291              public Thread newThread(Runnable r) {
 292                  Thread t = new Thread(r);
 293                  t.setDaemon(true);
 294                  synchronized (threads) {
 295                      threads.add(t);
 296                  }
 297                  return t;
 298              }
 299         };
 300 
 301         // exercise tests with varied number of threads
 302         for (int nThreads=1; nThreads<=5; nThreads++) {
 303             synchronized (threads) {
 304                 threads.clear();
 305             }
 306             ExecutorService executor = Executors.newFixedThreadPool(nThreads, threadFactory);
 307             Set<StandardOpenOption> opts = EnumSet.of(WRITE);
 308             AsynchronousFileChannel ch = AsynchronousFileChannel.open(file, opts, executor);
 309             try {
 310                 for (int i=0; i<10; i++) {
 311                     // do I/O operation to see which thread invokes the completion handler
 312                     final AtomicReference<Thread> invoker = new AtomicReference<Thread>();
 313                     final CountDownLatch latch = new CountDownLatch(1);
 314 
 315                     ch.write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 316                         public void completed(Integer result, Void att) {
 317                             invoker.set(Thread.currentThread());
 318                             latch.countDown();
 319                         }
 320                         public void failed(Throwable exc, Void att) {
 321                         }
 322                     });
 323                     await(latch);
 324 
 325                     // check invoker
 326                     boolean found = false;
 327                     synchronized (threads) {
 328                         for (Thread t: threads) {
 329                             if (t == invoker.get()) {
 330                                 found = true;
 331                                 break;
 332                             }
 333                         }
 334                     }
 335                     if (!found)
 336                         throw new RuntimeException("Invoker thread not found");
 337                 }
 338             } finally {
 339                 ch.close();
 340                 executor.shutdown();
 341             }
 342         }
 343 
 344 
 345         // test sharing a thread pool between many channels
 346         ExecutorService executor = Executors
 347             .newFixedThreadPool(1+rand.nextInt(10), threadFactory);
 348         final int n = 50 + rand.nextInt(50);
 349         AsynchronousFileChannel[] channels = new AsynchronousFileChannel[n];
 350         try {
 351             for (int i=0; i<n; i++) {
 352                 Set<StandardOpenOption> opts = EnumSet.of(WRITE);
 353                 channels[i] = AsynchronousFileChannel.open(file, opts, executor);
 354                 final CountDownLatch latch = new CountDownLatch(1);
 355                 channels[i].write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() {
 356                     public void completed(Integer result, Void att) {
 357                         latch.countDown();
 358                     }
 359                     public void failed(Throwable exc, Void att) {
 360                     }
 361                 });
 362                 await(latch);
 363 
 364                 // close ~half the channels
 365                 if (rand.nextBoolean())
 366                     channels[i].close();
 367             }
 368         } finally {
 369             // close remaining channels
 370             for (int i=0; i<n; i++) {
 371                 if (channels[i] != null) channels[i].close();
 372             }
 373             executor.shutdown();
 374         }
 375     }
 376 
 377     // exercise asynchronous close
 378     static void testAsynchronousClose(Path file) throws IOException {
 379         System.out.println("testAsynchronousClose");
 380 
 381         // create file
 382         AsynchronousFileChannel ch = AsynchronousFileChannel
 383             .open(file, WRITE, TRUNCATE_EXISTING);
 384         long size = 0L;
 385         do {
 386             ByteBuffer buf = genBuffer();
 387             int n = buf.remaining();
 388             writeFully(ch, buf, size);
 389             size += n;
 390         } while (size < (50L * 1024L * 1024L));
 391 
 392         ch.close();
 393 
 394         ch = AsynchronousFileChannel.open(file, WRITE, SYNC);
 395 
 396         // randomize number of writers, buffer size, and positions
 397 
 398         int nwriters = 1 + rand.nextInt(8);
 399         ByteBuffer[] buf = new ByteBuffer[nwriters];
 400         long[] position = new long[nwriters];
 401         for (int i=0; i<nwriters; i++) {
 402             buf[i] = genBuffer();
 403             position[i] = rand.nextInt((int)size);
 404         }
 405 
 406         // initiate I/O
 407         Future[] result = new Future[nwriters];
 408         for (int i=0; i<nwriters; i++) {
 409             result[i] = ch.write(buf[i], position[i]);
 410         }
 411 
 412         // close file
 413         ch.close();
 414 
 415         // write operations should complete or fail with AsynchronousCloseException
 416         for (int i=0; i<nwriters; i++) {
 417             try {
 418                 result[i].get();
 419             } catch (ExecutionException x) {
 420                 Throwable cause = x.getCause();
 421                 if (!(cause instanceof AsynchronousCloseException))
 422                     throw new RuntimeException(cause);
 423             } catch (CancellationException  x) {
 424                 throw new RuntimeException(x);   // should not happen
 425             } catch (InterruptedException x) {
 426                 throw new RuntimeException(x);   // should not happen
 427             }
 428         }
 429     }
 430 
 431     // exercise cancel method
 432     static void testCancel(Path file) throws IOException {
 433         System.out.println("testCancel");
 434 
 435         for (int i=0; i<2; i++) {
 436             boolean mayInterruptIfRunning = (i == 0) ? false : true;
 437 
 438             // open with SYNC option to improve chances that write will not
 439             // complete immediately
 440             AsynchronousFileChannel ch = AsynchronousFileChannel
 441                 .open(file, WRITE, SYNC);
 442 
 443             // start write operation
 444             Future<Integer> res = ch.write(genBuffer(), 0L);
 445 
 446             // cancel operation
 447             boolean cancelled = res.cancel(mayInterruptIfRunning);
 448 
 449             // check post-conditions
 450             if (!res.isDone())
 451                 throw new RuntimeException("isDone should return true");
 452             if (res.isCancelled() != cancelled)
 453                 throw new RuntimeException("isCancelled not consistent");
 454             try {
 455                 res.get();
 456                 if (cancelled)
 457                     throw new RuntimeException("CancellationException expected");
 458             } catch (CancellationException x) {
 459                 if (!cancelled)
 460                     throw new RuntimeException("CancellationException not expected");
 461             } catch (ExecutionException x) {
 462                 throw new RuntimeException(x);
 463             } catch (InterruptedException x) {
 464                 throw new RuntimeException(x);
 465             }
 466             try {
 467                 res.get(1, TimeUnit.SECONDS);
 468                 if (cancelled)
 469                     throw new RuntimeException("CancellationException expected");
 470             } catch (CancellationException x) {
 471                 if (!cancelled)
 472                     throw new RuntimeException("CancellationException not expected");
 473             } catch (ExecutionException x) {
 474                 throw new RuntimeException(x);
 475             } catch (TimeoutException x) {
 476                 throw new RuntimeException(x);
 477             } catch (InterruptedException x) {
 478                 throw new RuntimeException(x);
 479             }
 480 
 481             ch.close();
 482         }
 483     }
 484 
 485     // exercise truncate method
 486     static void testTruncate(Path file) throws IOException {
 487         System.out.println("testTruncate");
 488 
 489         // basic tests
 490         AsynchronousFileChannel ch = AsynchronousFileChannel
 491             .open(file, CREATE, WRITE, TRUNCATE_EXISTING);
 492         try {
 493             writeFully(ch, genBuffer(), 0L);
 494             long size = ch.size();
 495 
 496             // attempt to truncate to a size greater than the current size
 497             if (ch.truncate(size + 1L).size() != size)
 498                 throw new RuntimeException("Unexpected size after truncation");
 499 
 500             // truncate file
 501             if (ch.truncate(size - 1L).size() != (size - 1L))
 502                 throw new RuntimeException("Unexpected size after truncation");
 503 
 504             // invalid size
 505             try {
 506                 ch.truncate(-1L);
 507                 throw new RuntimeException("IllegalArgumentException expected");
 508             } catch (IllegalArgumentException e) { }
 509 
 510         } finally {
 511             ch.close();
 512         }
 513 
 514         // channel is closed
 515         try {
 516             ch.truncate(0L);
 517             throw new RuntimeException("ClosedChannelException expected");
 518         } catch (ClosedChannelException  e) { }
 519 
 520         // channel is read-only
 521         ch = AsynchronousFileChannel.open(file, READ);
 522         try {
 523             try {
 524             ch.truncate(0L);
 525                 throw new RuntimeException("NonWritableChannelException expected");
 526             } catch (NonWritableChannelException  e) { }
 527         } finally {
 528             ch.close();
 529         }
 530     }
 531 
 532     // returns ByteBuffer with random bytes
 533     static ByteBuffer genBuffer() {
 534         int size = 1024 + rand.nextInt(16000);
 535         byte[] buf = new byte[size];
 536         boolean useDirect = rand.nextBoolean();
 537         if (useDirect) {
 538             ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
 539             bb.put(buf);
 540             bb.flip();
 541             return bb;
 542         } else {
 543             return ByteBuffer.wrap(buf);
 544         }
 545     }
 546 
 547     // writes all remaining bytes in the buffer to the given channel at the
 548     // given position
 549     static void writeFully(final AsynchronousFileChannel ch,
 550                            final ByteBuffer src,
 551                            long position)
 552     {
 553         final CountDownLatch latch = new CountDownLatch(1);
 554 
 555         // use position as attachment
 556         ch.write(src, position, position, new CompletionHandler<Integer,Long>() {
 557             public void completed(Integer result, Long position) {
 558                 int n = result;
 559                 if (src.hasRemaining()) {
 560                     long p = position + n;
 561                     ch.write(src, p, p, this);
 562                 } else {
 563                     latch.countDown();
 564                 }
 565             }
 566             public void failed(Throwable exc, Long position) {
 567             }
 568         });
 569 
 570         // wait for writes to complete
 571         await(latch);
 572     }
 573 
 574     static void readAll(final AsynchronousFileChannel ch,
 575                         final ByteBuffer dst,
 576                        long position)
 577     {
 578         final CountDownLatch latch = new CountDownLatch(1);
 579 
 580         // use position as attachment
 581         ch.read(dst, position, position, new CompletionHandler<Integer,Long>() {
 582             public void completed(Integer result, Long position) {
 583                 int n = result;
 584                 if (n > 0) {
 585                     long p = position + n;
 586                     ch.read(dst, p, p, this);
 587                 } else {
 588                     latch.countDown();
 589                 }
 590             }
 591             public void failed(Throwable exc, Long position) {
 592             }
 593         });
 594 
 595         // wait for reads to complete
 596         await(latch);
 597     }
 598 
 599     static void await(CountDownLatch latch) {
 600         // wait until done
 601         boolean done = false;
 602         while (!done) {
 603             try {
 604                 latch.await();
 605                 done = true;
 606             } catch (InterruptedException x) { }
 607         }
 608     }
 609 }