1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.net.InetSocketAddress; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SocketChannel; 34 import java.util.Arrays; 35 import java.util.IdentityHashMap; 36 import java.util.List; 37 import java.util.Map; 38 import java.util.TreeMap; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.CompletionStage; 41 import java.util.concurrent.ConcurrentLinkedDeque; 42 import java.util.concurrent.Flow; 43 import java.util.function.BiPredicate; 44 import java.util.function.Predicate; 45 import java.net.http.HttpClient; 46 import java.net.http.HttpClient.Version; 47 import java.net.http.HttpHeaders; 48 import jdk.internal.net.http.common.Demand; 49 import jdk.internal.net.http.common.FlowTube; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 52 import jdk.internal.net.http.common.Log; 53 import jdk.internal.net.http.common.Utils; 54 import static java.net.http.HttpClient.Version.HTTP_2; 55 56 /** 57 * Wraps socket channel layer and takes care of SSL also. 58 * 59 * Subtypes are: 60 * PlainHttpConnection: regular direct TCP connection to server 61 * PlainProxyConnection: plain text proxy connection 62 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 63 * AsyncSSLConnection: TLS channel direct to server 64 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 65 */ 66 abstract class HttpConnection implements Closeable { 67 68 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 69 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 70 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( 71 () -> "HttpConnection(SocketTube(?))", DEBUG); 72 73 /** The address this connection is connected to. Could be a server or a proxy. */ 74 final InetSocketAddress address; 75 private final HttpClientImpl client; 76 private final TrailingOperations trailingOperations; 77 78 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 79 this.address = address; 80 this.client = client; 81 trailingOperations = new TrailingOperations(); 82 } 83 84 private static final class TrailingOperations { 85 private final Map<CompletionStage<?>, Boolean> operations = 86 new IdentityHashMap<>(); 87 void add(CompletionStage<?> cf) { 88 synchronized(operations) { 89 cf.whenComplete((r,t)-> remove(cf)); 90 operations.put(cf, Boolean.TRUE); 91 } 92 } 93 boolean remove(CompletionStage<?> cf) { 94 synchronized(operations) { 95 return operations.remove(cf); 96 } 97 } 98 } 99 100 final void addTrailingOperation(CompletionStage<?> cf) { 101 trailingOperations.add(cf); 102 } 103 104 // final void removeTrailingOperation(CompletableFuture<?> cf) { 105 // trailingOperations.remove(cf); 106 // } 107 108 final HttpClientImpl client() { 109 return client; 110 } 111 112 //public abstract void connect() throws IOException, InterruptedException; 113 114 public abstract CompletableFuture<Void> connectAsync(); 115 116 /** Tells whether, or not, this connection is connected to its destination. */ 117 abstract boolean connected(); 118 119 /** Tells whether, or not, this connection is secure ( over SSL ) */ 120 abstract boolean isSecure(); 121 122 /** Tells whether, or not, this connection is proxied. */ 123 abstract boolean isProxied(); 124 125 /** Tells whether, or not, this connection is open. */ 126 final boolean isOpen() { 127 return channel().isOpen() && 128 (connected() ? !getConnectionFlow().isFinished() : true); 129 } 130 131 interface HttpPublisher extends FlowTube.TubePublisher { 132 void enqueue(List<ByteBuffer> buffers) throws IOException; 133 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; 134 void signalEnqueued() throws IOException; 135 } 136 137 /** 138 * Returns the HTTP publisher associated with this connection. May be null 139 * if invoked before connecting. 140 */ 141 abstract HttpPublisher publisher(); 142 143 // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS 144 private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> 145 proto.equals("TLSv1.2") || proto.equals("TLSv1.3"); 146 147 /** 148 * Returns true if the given client's SSL parameter protocols contains at 149 * least one TLS version that HTTP/2 requires. 150 */ 151 private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { 152 String[] protos = client.sslParameters().getProtocols(); 153 if (protos != null) { 154 return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); 155 } else { 156 return false; 157 } 158 } 159 160 /** 161 * Factory for retrieving HttpConnections. A connection can be retrieved 162 * from the connection pool, or a new one created if none available. 163 * 164 * The given {@code addr} is the ultimate destination. Any proxies, 165 * etc, are determined from the request. Returns a concrete instance which 166 * is one of the following: 167 * {@link PlainHttpConnection} 168 * {@link PlainTunnelingConnection} 169 * 170 * The returned connection, if not from the connection pool, must have its, 171 * connect() or connectAsync() method invoked, which ( when it completes 172 * successfully ) renders the connection usable for requests. 173 */ 174 public static HttpConnection getConnection(InetSocketAddress addr, 175 HttpClientImpl client, 176 HttpRequestImpl request, 177 Version version) { 178 HttpConnection c = null; 179 InetSocketAddress proxy = request.proxy(); 180 if (proxy != null && proxy.isUnresolved()) { 181 // The default proxy selector may select a proxy whose address is 182 // unresolved. We must resolve the address before connecting to it. 183 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); 184 } 185 boolean secure = request.secure(); 186 ConnectionPool pool = client.connectionPool(); 187 188 if (!secure) { 189 c = pool.getConnection(false, addr, proxy); 190 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 191 final HttpConnection conn = c; 192 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 193 + ": plain connection retrieved from HTTP/1.1 pool"); 194 return c; 195 } else { 196 return getPlainConnection(addr, proxy, request, client); 197 } 198 } else { // secure 199 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 200 c = pool.getConnection(true, addr, proxy); 201 } 202 if (c != null && c.isOpen()) { 203 final HttpConnection conn = c; 204 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() 205 + ": SSL connection retrieved from HTTP/1.1 pool"); 206 return c; 207 } else { 208 String[] alpn = null; 209 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { 210 alpn = new String[] { "h2", "http/1.1" }; 211 } 212 return getSSLConnection(addr, proxy, alpn, request, client); 213 } 214 } 215 } 216 217 private static HttpConnection getSSLConnection(InetSocketAddress addr, 218 InetSocketAddress proxy, 219 String[] alpn, 220 HttpRequestImpl request, 221 HttpClientImpl client) { 222 if (proxy != null) 223 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, 224 proxyTunnelHeaders(request)); 225 else 226 return new AsyncSSLConnection(addr, client, alpn); 227 } 228 229 /** 230 * This method is used to build a filter that will accept or 231 * veto (header-name, value) tuple for transmission on the 232 * wire. 233 * The filter is applied to the headers when sending the headers 234 * to the remote party. 235 * Which tuple is accepted/vetoed depends on: 236 * <pre> 237 * - whether the connection is a tunnel connection 238 * [talking to a server through a proxy tunnel] 239 * - whether the method is CONNECT 240 * [establishing a CONNECT tunnel through a proxy] 241 * - whether the request is using a proxy 242 * (and the connection is not a tunnel) 243 * [talking to a server through a proxy] 244 * - whether the request is a direct connection to 245 * a server (no tunnel, no proxy). 246 * </pre> 247 * @param request 248 * @return 249 */ 250 BiPredicate<String,List<String>> headerFilter(HttpRequestImpl request) { 251 if (isTunnel()) { 252 // talking to a server through a proxy tunnel 253 // don't send proxy-* headers to a plain server 254 assert !request.isConnect(); 255 return Utils.NO_PROXY_HEADERS_FILTER; 256 } else if (request.isConnect()) { 257 // establishing a proxy tunnel 258 // check for proxy tunnel disabled schemes 259 // assert !this.isTunnel(); 260 assert request.proxy() == null; 261 return Utils.PROXY_TUNNEL_FILTER; 262 } else if (request.proxy() != null) { 263 // talking to a server through a proxy (no tunnel) 264 // check for proxy disabled schemes 265 // assert !isTunnel() && !request.isConnect(); 266 return Utils.PROXY_FILTER; 267 } else { 268 // talking to a server directly (no tunnel, no proxy) 269 // don't send proxy-* headers to a plain server 270 // assert request.proxy() == null && !request.isConnect(); 271 return Utils.NO_PROXY_HEADERS_FILTER; 272 } 273 } 274 275 // Composes a new immutable HttpHeaders that combines the 276 // user and system header but only keeps those headers that 277 // start with "proxy-" 278 private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { 279 Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 280 combined.putAll(request.getSystemHeaders().map()); 281 combined.putAll(request.headers().map()); // let user override system 282 283 // keep only proxy-* - and also strip authorization headers 284 // for disabled schemes 285 return ImmutableHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); 286 } 287 288 /* Returns either a plain HTTP connection or a plain tunnelling connection 289 * for proxied WebSocket */ 290 private static HttpConnection getPlainConnection(InetSocketAddress addr, 291 InetSocketAddress proxy, 292 HttpRequestImpl request, 293 HttpClientImpl client) { 294 if (request.isWebSocket() && proxy != null) 295 return new PlainTunnelingConnection(addr, proxy, client, 296 proxyTunnelHeaders(request)); 297 298 if (proxy == null) 299 return new PlainHttpConnection(addr, client); 300 else 301 return new PlainProxyConnection(proxy, client); 302 } 303 304 void closeOrReturnToCache(HttpHeaders hdrs) { 305 if (hdrs == null) { 306 // the connection was closed by server, eof 307 close(); 308 return; 309 } 310 if (!isOpen()) { 311 return; 312 } 313 HttpClientImpl client = client(); 314 if (client == null) { 315 close(); 316 return; 317 } 318 ConnectionPool pool = client.connectionPool(); 319 boolean keepAlive = hdrs.firstValue("Connection") 320 .map((s) -> !s.equalsIgnoreCase("close")) 321 .orElse(true); 322 323 if (keepAlive) { 324 Log.logTrace("Returning connection to the pool: {0}", this); 325 pool.returnToPool(this); 326 } else { 327 close(); 328 } 329 } 330 331 /* Tells whether or not this connection is a tunnel through a proxy */ 332 boolean isTunnel() { return false; } 333 334 abstract SocketChannel channel(); 335 336 final InetSocketAddress address() { 337 return address; 338 } 339 340 abstract ConnectionPool.CacheKey cacheKey(); 341 342 /** 343 * Closes this connection, by returning the socket to its connection pool. 344 */ 345 @Override 346 public abstract void close(); 347 348 abstract void shutdownInput() throws IOException; 349 350 abstract void shutdownOutput() throws IOException; 351 352 // Support for WebSocket/RawChannelImpl which unfortunately 353 // still depends on synchronous read/writes. 354 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 355 abstract static class DetachedConnectionChannel implements Closeable { 356 DetachedConnectionChannel() {} 357 abstract SocketChannel channel(); 358 abstract long write(ByteBuffer[] buffers, int start, int number) 359 throws IOException; 360 abstract void shutdownInput() throws IOException; 361 abstract void shutdownOutput() throws IOException; 362 abstract ByteBuffer read() throws IOException; 363 @Override 364 public abstract void close(); 365 @Override 366 public String toString() { 367 return this.getClass().getSimpleName() + ": " + channel().toString(); 368 } 369 } 370 371 // Support for WebSocket/RawChannelImpl which unfortunately 372 // still depends on synchronous read/writes. 373 // It should be removed when RawChannelImpl moves to using asynchronous APIs. 374 abstract DetachedConnectionChannel detachChannel(); 375 376 abstract FlowTube getConnectionFlow(); 377 378 /** 379 * A publisher that makes it possible to publish (write) 380 * ordered (normal priority) and unordered (high priority) 381 * buffers downstream. 382 */ 383 final class PlainHttpPublisher implements HttpPublisher { 384 final Object reading; 385 PlainHttpPublisher() { 386 this(new Object()); 387 } 388 PlainHttpPublisher(Object readingLock) { 389 this.reading = readingLock; 390 } 391 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 392 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 393 volatile HttpWriteSubscription subscription; 394 final SequentialScheduler writeScheduler = 395 new SequentialScheduler(this::flushTask); 396 @Override 397 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 398 synchronized (reading) { 399 //assert this.subscription == null; 400 //assert this.subscriber == null; 401 if (subscription == null) { 402 subscription = new HttpWriteSubscription(); 403 } 404 this.subscriber = subscriber; 405 } 406 // TODO: should we do this in the flow? 407 subscriber.onSubscribe(subscription); 408 signal(); 409 } 410 411 void flushTask(DeferredCompleter completer) { 412 try { 413 HttpWriteSubscription sub = subscription; 414 if (sub != null) sub.flush(); 415 } finally { 416 completer.complete(); 417 } 418 } 419 420 void signal() { 421 writeScheduler.runOrSchedule(); 422 } 423 424 final class HttpWriteSubscription implements Flow.Subscription { 425 final Demand demand = new Demand(); 426 427 @Override 428 public void request(long n) { 429 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 430 demand.increase(n); 431 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " 432 + n + " from " 433 + getConnectionFlow()); 434 writeScheduler.runOrSchedule(); 435 } 436 437 @Override 438 public void cancel() { 439 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " 440 + getConnectionFlow()); 441 } 442 443 void flush() { 444 while (!queue.isEmpty() && demand.tryDecrement()) { 445 List<ByteBuffer> elem = queue.poll(); 446 debug.log(Level.DEBUG, () -> "HttpPublisher: sending " 447 + Utils.remaining(elem) + " bytes (" 448 + elem.size() + " buffers) to " 449 + getConnectionFlow()); 450 subscriber.onNext(elem); 451 } 452 } 453 } 454 455 @Override 456 public void enqueue(List<ByteBuffer> buffers) throws IOException { 457 queue.add(buffers); 458 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 459 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); 460 } 461 462 @Override 463 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 464 // Unordered frames are sent before existing frames. 465 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 466 queue.addFirst(buffers); 467 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); 468 } 469 470 @Override 471 public void signalEnqueued() throws IOException { 472 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); 473 signal(); 474 } 475 } 476 477 String dbgTag = null; 478 final String dbgString() { 479 FlowTube flow = getConnectionFlow(); 480 String tag = dbgTag; 481 if (tag == null && flow != null) { 482 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 483 } else if (tag == null) { 484 tag = this.getClass().getSimpleName() + "(?)"; 485 } 486 return tag; 487 } 488 489 @Override 490 public String toString() { 491 return "HttpConnection: " + channel().toString(); 492 } 493 }