1 /* 2 * Copyright (c) 2002-2018, the original author or authors. 3 * 4 * This software is distributable under the BSD license. See the terms of the 5 * BSD license in the documentation provided with this software. 6 * 7 * http://www.opensource.org/licenses/bsd-license.php 8 */ 9 package jdk.internal.org.jline.utils; 10 11 import java.io.IOException; 12 import java.io.InterruptedIOException; 13 import java.io.Reader; 14 15 /** 16 * This class wraps a regular reader and allows it to appear as if it 17 * is non-blocking; that is, reads can be performed against it that timeout 18 * if no data is seen for a period of time. This effect is achieved by having 19 * a separate thread perform all non-blocking read requests and then 20 * waiting on the thread to complete. 21 * 22 * <p>VERY IMPORTANT NOTES 23 * <ul> 24 * <li> This class is not thread safe. It expects at most one reader. 25 * <li> The {@link #shutdown()} method must be called in order to shut down 26 * the thread that handles blocking I/O. 27 * </ul> 28 * @since 2.7 29 * @author Scott C. Gray <scottgray1@gmail.com> 30 */ 31 public class NonBlockingReaderImpl 32 extends NonBlockingReader 33 { 34 public static final int READ_EXPIRED = -2; 35 36 private Reader in; // The actual input stream 37 private int ch = READ_EXPIRED; // Recently read character 38 39 private String name; 40 private boolean threadIsReading = false; 41 private IOException exception = null; 42 private long threadDelay = 60 * 1000; 43 private Thread thread; 44 45 /** 46 * Creates a <code>NonBlockingReader</code> out of a normal blocking 47 * reader. Note that this call also spawn a separate thread to perform the 48 * blocking I/O on behalf of the thread that is using this class. The 49 * {@link #shutdown()} method must be called in order to shut this thread down. 50 * @param name The reader name 51 * @param in The reader to wrap 52 */ 53 public NonBlockingReaderImpl(String name, Reader in) { 54 this.in = in; 55 this.name = name; 56 } 57 58 private synchronized void startReadingThreadIfNeeded() { 59 if (thread == null) { 60 thread = new Thread(this::run); 61 thread.setName(name + " non blocking reader thread"); 62 thread.setDaemon(true); 63 thread.start(); 64 } 65 } 66 67 /** 68 * Shuts down the thread that is handling blocking I/O. Note that if the 69 * thread is currently blocked waiting for I/O it will not actually 70 * shut down until the I/O is received. 71 */ 72 public synchronized void shutdown() { 73 if (thread != null) { 74 notify(); 75 } 76 } 77 78 @Override 79 public void close() throws IOException { 80 /* 81 * The underlying input stream is closed first. This means that if the 82 * I/O thread was blocked waiting on input, it will be woken for us. 83 */ 84 in.close(); 85 shutdown(); 86 } 87 88 @Override 89 public synchronized boolean ready() throws IOException { 90 return ch >= 0 || in.ready(); 91 } 92 93 /** 94 * Attempts to read a character from the input stream for a specific 95 * period of time. 96 * @param timeout The amount of time to wait for the character 97 * @return The character read, -1 if EOF is reached, or -2 if the 98 * read timed out. 99 */ 100 protected synchronized int read(long timeout, boolean isPeek) throws IOException { 101 /* 102 * If the thread hit an IOException, we report it. 103 */ 104 if (exception != null) { 105 assert ch == READ_EXPIRED; 106 IOException toBeThrown = exception; 107 if (!isPeek) 108 exception = null; 109 throw toBeThrown; 110 } 111 112 /* 113 * If there was a pending character from the thread, then 114 * we send it. If the timeout is 0L or the thread was shut down 115 * then do a local read. 116 */ 117 if (ch >= -1) { 118 assert exception == null; 119 } 120 else if (!isPeek && timeout <= 0L && !threadIsReading) { 121 ch = in.read(); 122 } 123 else { 124 /* 125 * If the thread isn't reading already, then ask it to do so. 126 */ 127 if (!threadIsReading) { 128 threadIsReading = true; 129 startReadingThreadIfNeeded(); 130 notifyAll(); 131 } 132 133 boolean isInfinite = (timeout <= 0L); 134 135 /* 136 * So the thread is currently doing the reading for us. So 137 * now we play the waiting game. 138 */ 139 while (isInfinite || timeout > 0L) { 140 long start = System.currentTimeMillis (); 141 142 try { 143 if (Thread.interrupted()) { 144 throw new InterruptedException(); 145 } 146 wait(timeout); 147 } 148 catch (InterruptedException e) { 149 exception = (IOException) new InterruptedIOException().initCause(e); 150 } 151 152 if (exception != null) { 153 assert ch == READ_EXPIRED; 154 155 IOException toBeThrown = exception; 156 if (!isPeek) 157 exception = null; 158 throw toBeThrown; 159 } 160 161 if (ch >= -1) { 162 assert exception == null; 163 break; 164 } 165 166 if (!isInfinite) { 167 timeout -= System.currentTimeMillis() - start; 168 } 169 } 170 } 171 172 /* 173 * ch is the character that was just read. Either we set it because 174 * a local read was performed or the read thread set it (or failed to 175 * change it). We will return it's value, but if this was a peek 176 * operation, then we leave it in place. 177 */ 178 int ret = ch; 179 if (!isPeek) { 180 ch = READ_EXPIRED; 181 } 182 return ret; 183 } 184 185 private void run () { 186 Log.debug("NonBlockingReader start"); 187 boolean needToRead; 188 189 try { 190 while (true) { 191 192 /* 193 * Synchronize to grab variables accessed by both this thread 194 * and the accessing thread. 195 */ 196 synchronized (this) { 197 needToRead = this.threadIsReading; 198 199 try { 200 /* 201 * Nothing to do? Then wait. 202 */ 203 if (!needToRead) { 204 wait(threadDelay); 205 } 206 } catch (InterruptedException e) { 207 /* IGNORED */ 208 } 209 210 needToRead = this.threadIsReading; 211 if (!needToRead) { 212 return; 213 } 214 } 215 216 /* 217 * We're not shutting down, but we need to read. This cannot 218 * happen while we are holding the lock (which we aren't now). 219 */ 220 int charRead = READ_EXPIRED; 221 IOException failure = null; 222 try { 223 charRead = in.read(); 224 // if (charRead < 0) { 225 // continue; 226 // } 227 } catch (IOException e) { 228 failure = e; 229 // charRead = -1; 230 } 231 232 /* 233 * Re-grab the lock to update the state. 234 */ 235 synchronized (this) { 236 exception = failure; 237 ch = charRead; 238 threadIsReading = false; 239 notify(); 240 } 241 } 242 } catch (Throwable t) { 243 Log.warn("Error in NonBlockingReader thread", t); 244 } finally { 245 Log.debug("NonBlockingReader shutdown"); 246 synchronized (this) { 247 thread = null; 248 threadIsReading = false; 249 } 250 } 251 } 252 253 public synchronized void clear() throws IOException { 254 while (ready()) { 255 read(); 256 } 257 } 258 }