1 /*
   2  * Copyright (c) 2002-2018, the original author or authors.
   3  *
   4  * This software is distributable under the BSD license. See the terms of the
   5  * BSD license in the documentation provided with this software.
   6  *
   7  * http://www.opensource.org/licenses/bsd-license.php
   8  */
   9 package jdk.internal.org.jline.utils;
  10 
  11 import java.io.IOException;
  12 import java.io.InterruptedIOException;
  13 import java.io.Reader;
  14 
  15 /**
  16  * This class wraps a regular reader and allows it to appear as if it
  17  * is non-blocking; that is, reads can be performed against it that timeout
  18  * if no data is seen for a period of time.  This effect is achieved by having
  19  * a separate thread perform all non-blocking read requests and then
  20  * waiting on the thread to complete.
  21  *
  22  * <p>VERY IMPORTANT NOTES
  23  * <ul>
  24  *   <li> This class is not thread safe. It expects at most one reader.
  25  *   <li> The {@link #shutdown()} method must be called in order to shut down
  26  *          the thread that handles blocking I/O.
  27  * </ul>
  28  * @since 2.7
  29  * @author Scott C. Gray &lt;scottgray1@gmail.com&gt;
  30  */
  31 public class NonBlockingReaderImpl
  32     extends NonBlockingReader
  33 {
  34     public static final int READ_EXPIRED = -2;
  35 
  36     private Reader in;                  // The actual input stream
  37     private int    ch   = READ_EXPIRED; // Recently read character
  38 
  39     private String      name;
  40     private boolean     threadIsReading      = false;
  41     private IOException exception            = null;
  42     private long        threadDelay          = 60 * 1000;
  43     private Thread      thread;
  44 
  45     /**
  46      * Creates a <code>NonBlockingReader</code> out of a normal blocking
  47      * reader. Note that this call also spawn a separate thread to perform the
  48      * blocking I/O on behalf of the thread that is using this class. The
  49      * {@link #shutdown()} method must be called in order to shut this thread down.
  50      * @param name The reader name
  51      * @param in The reader to wrap
  52      */
  53     public NonBlockingReaderImpl(String name, Reader in) {
  54         this.in = in;
  55         this.name = name;
  56     }
  57 
  58     private synchronized void startReadingThreadIfNeeded() {
  59         if (thread == null) {
  60             thread = new Thread(this::run);
  61             thread.setName(name + " non blocking reader thread");
  62             thread.setDaemon(true);
  63             thread.start();
  64         }
  65     }
  66 
  67     /**
  68      * Shuts down the thread that is handling blocking I/O. Note that if the
  69      * thread is currently blocked waiting for I/O it will not actually
  70      * shut down until the I/O is received.
  71      */
  72     public synchronized void shutdown() {
  73         if (thread != null) {
  74             notify();
  75         }
  76     }
  77 
  78     @Override
  79     public void close() throws IOException {
  80         /*
  81          * The underlying input stream is closed first. This means that if the
  82          * I/O thread was blocked waiting on input, it will be woken for us.
  83          */
  84         in.close();
  85         shutdown();
  86     }
  87 
  88     @Override
  89     public synchronized boolean ready() throws IOException {
  90         return ch >= 0 || in.ready();
  91     }
  92 
  93     /**
  94      * Attempts to read a character from the input stream for a specific
  95      * period of time.
  96      * @param timeout The amount of time to wait for the character
  97      * @return The character read, -1 if EOF is reached, or -2 if the
  98      *   read timed out.
  99      */
 100     protected synchronized int read(long timeout, boolean isPeek) throws IOException {
 101         /*
 102          * If the thread hit an IOException, we report it.
 103          */
 104         if (exception != null) {
 105             assert ch == READ_EXPIRED;
 106             IOException toBeThrown = exception;
 107             if (!isPeek)
 108                 exception = null;
 109             throw toBeThrown;
 110         }
 111 
 112         /*
 113          * If there was a pending character from the thread, then
 114          * we send it. If the timeout is 0L or the thread was shut down
 115          * then do a local read.
 116          */
 117         if (ch >= -1) {
 118             assert exception == null;
 119         }
 120         else if (!isPeek && timeout <= 0L && !threadIsReading) {
 121             ch = in.read();
 122         }
 123         else {
 124             /*
 125              * If the thread isn't reading already, then ask it to do so.
 126              */
 127             if (!threadIsReading) {
 128                 threadIsReading = true;
 129                 startReadingThreadIfNeeded();
 130                 notifyAll();
 131             }
 132 
 133             boolean isInfinite = (timeout <= 0L);
 134 
 135             /*
 136              * So the thread is currently doing the reading for us. So
 137              * now we play the waiting game.
 138              */
 139             while (isInfinite || timeout > 0L)  {
 140                 long start = System.currentTimeMillis ();
 141 
 142                 try {
 143                     if (Thread.interrupted()) {
 144                         throw new InterruptedException();
 145                     }
 146                     wait(timeout);
 147                 }
 148                 catch (InterruptedException e) {
 149                     exception = (IOException) new InterruptedIOException().initCause(e);
 150                 }
 151 
 152                 if (exception != null) {
 153                     assert ch == READ_EXPIRED;
 154 
 155                     IOException toBeThrown = exception;
 156                     if (!isPeek)
 157                         exception = null;
 158                     throw toBeThrown;
 159                 }
 160 
 161                 if (ch >= -1) {
 162                     assert exception == null;
 163                     break;
 164                 }
 165 
 166                 if (!isInfinite) {
 167                     timeout -= System.currentTimeMillis() - start;
 168                 }
 169             }
 170         }
 171 
 172         /*
 173          * ch is the character that was just read. Either we set it because
 174          * a local read was performed or the read thread set it (or failed to
 175          * change it).  We will return it's value, but if this was a peek
 176          * operation, then we leave it in place.
 177          */
 178         int ret = ch;
 179         if (!isPeek) {
 180             ch = READ_EXPIRED;
 181         }
 182         return ret;
 183     }
 184 
 185     private void run () {
 186         Log.debug("NonBlockingReader start");
 187         boolean needToRead;
 188 
 189         try {
 190             while (true) {
 191 
 192                 /*
 193                  * Synchronize to grab variables accessed by both this thread
 194                  * and the accessing thread.
 195                  */
 196                 synchronized (this) {
 197                     needToRead = this.threadIsReading;
 198 
 199                     try {
 200                         /*
 201                          * Nothing to do? Then wait.
 202                          */
 203                         if (!needToRead) {
 204                             wait(threadDelay);
 205                         }
 206                     } catch (InterruptedException e) {
 207                         /* IGNORED */
 208                     }
 209 
 210                     needToRead = this.threadIsReading;
 211                     if (!needToRead) {
 212                         return;
 213                     }
 214                 }
 215 
 216                 /*
 217                  * We're not shutting down, but we need to read. This cannot
 218                  * happen while we are holding the lock (which we aren't now).
 219                  */
 220                 int charRead = READ_EXPIRED;
 221                 IOException failure = null;
 222                 try {
 223                     charRead = in.read();
 224 //                    if (charRead < 0) {
 225 //                        continue;
 226 //                    }
 227                 } catch (IOException e) {
 228                     failure = e;
 229 //                    charRead = -1;
 230                 }
 231 
 232                 /*
 233                  * Re-grab the lock to update the state.
 234                  */
 235                 synchronized (this) {
 236                     exception = failure;
 237                     ch = charRead;
 238                     threadIsReading = false;
 239                     notify();
 240                 }
 241             }
 242         } catch (Throwable t) {
 243             Log.warn("Error in NonBlockingReader thread", t);
 244         } finally {
 245             Log.debug("NonBlockingReader shutdown");
 246             synchronized (this) {
 247                 thread = null;
 248                 threadIsReading = false;
 249             }
 250         }
 251     }
 252 
 253     public synchronized void clear() throws IOException {
 254         while (ready()) {
 255             read();
 256         }
 257     }
 258 }